Rocketmq事務消息之半消息詳解
什么是半消息(事務消息)
當我們在業(yè)務邏輯中發(fā)送消息時,消息與業(yè)務的事務之間難以保證一致性,如果業(yè)務代碼出現(xiàn)異常,如果已發(fā)送的消息無法回滾,則很會出現(xiàn)數(shù)據(jù)不一致的情況,RocketMQ的事務消息支持在業(yè)務邏輯與發(fā)送消息之間提供事務保證,RocketMQ通過兩階段的方式提供事務消息的支持。
首需要注意的是 事務消息(半消息) 僅僅只是保證本地事務和MQ消息發(fā)送形成整體的 原子性 ,而投遞到MQ服務器后,并無法保證消費者一定能消費成功!
- 事務消息 :MQ 提供類似 X/Open XA 的分布事務功能,通過 MQ 事務消息能達到分布式事務的最終一致。
- 半消息 :暫不能投遞的消息,發(fā)送方已經(jīng)將消息成功發(fā)送到了 MQ 服務端,但是服務端未收到生產(chǎn)者對該消息的二次確認,此時該消息被標記成“暫不能投遞”狀態(tài),處于該種狀態(tài)下的消息即半消息。
- 半消息回查 :由于網(wǎng)絡閃斷、生產(chǎn)者應用重啟等原因,導致某條事務消息的二次確認丟失,MQ 服務端通過掃描發(fā)現(xiàn)某條消息長期處于“半消息”時,需要主動向消息生產(chǎn)者詢問該消息的最終狀態(tài)(Commit 或是 Rollback),該過程即消息回查。
極端情況:是否任何情況下MQ的事務性消息都可以保證雙方的最終一致性?答案是否定的。
考慮上面提到的異常情況“情況2:MQ發(fā)送方在執(zhí)行完本地事務之后commit之前異常退出”。
在這種情況下如果如果MQ發(fā)送方由于運維上的失誤長時間不重啟MQ發(fā)送方,那么MQ在多次回查不成功之后將會丟棄該消息。
最終分布式事務的雙方是不能達到最終一致性了。當然這個回查的最大值可以通過修改broker的參數(shù)transactionCheckMax來調(diào)整。但是過大的transactionCheckMax參數(shù)將會導致MQ堆積過多的半包消息,從而危害MQ的穩(wěn)定性,是個需要權衡的參數(shù)。
半消息事務實現(xiàn)流程
流程:
1.發(fā)送方向 MQ 服務端發(fā)送事務消息;
2.MQ Server 將消息持久化成功之后,向發(fā)送方 ACK 確認消息已經(jīng)發(fā)送成功,此時消息為半消息。
3.發(fā)送方開始執(zhí)行本地事務邏輯。
4.發(fā)送方根據(jù)本地事務執(zhí)行結(jié)果向 MQ Server 提交二次確認(Commit 或是 Rollback),MQ Server 收到 Commit 狀態(tài)則將半消息標記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態(tài)則刪除
半消息,訂閱方將不會接受該消息。
5.在斷網(wǎng)或者是應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達 MQ Server,經(jīng)過固定時間后 MQ Server 將對該消息發(fā)起消息回查。
6.發(fā)送方收到消息回查后,需要檢查對應消息的本地事務執(zhí)行的最終結(jié)果。
7.發(fā)送方根據(jù)檢查得到的本地事務的最終狀態(tài)再次提交二次確認,MQ Server 仍按照步驟4對半消息進行操作。
半消息的應用場景
注冊系統(tǒng)注冊的流程中,用戶入口在網(wǎng)頁注冊系統(tǒng),通知系統(tǒng)在郵件系統(tǒng),兩個系統(tǒng)之間的數(shù)據(jù)需要保持最終一致。
普通消息處理
如上所述,注冊系統(tǒng)和郵件通知系統(tǒng)之間通過消息隊列進行異步處理。注冊系統(tǒng)將注冊信息寫入注冊系統(tǒng)之后,發(fā)送一條注冊成功的消息到消息隊列RocketMQ版,郵件通知系統(tǒng)訂閱消息隊列RocketMQ版的注冊消息,做相應的業(yè)務處理,發(fā)送注冊成功或者失敗的郵件。
普通消息處理
流程說明如下:
1.注冊系統(tǒng)發(fā)起注冊。
2.注冊系統(tǒng)向消息隊列RocketMQ版發(fā)送注冊消息成功與否的消息。
2.1 消息發(fā)送成功,進入3。
2.2 消息發(fā)送失敗,導致郵件通知系統(tǒng)未收到消息隊列RocketMQ版發(fā)送的注冊成功與否的消息,而無法發(fā)送郵件,最終郵件通知系統(tǒng)和注冊系統(tǒng)之間的狀態(tài)數(shù)據(jù)不一致,流程結(jié)束。
3.郵件通知系統(tǒng)收到消息隊列RocketMQ版的注冊成功消息。
4.郵件通知系統(tǒng)發(fā)送注冊成功郵件給用戶。
在這樣的情況下,雖然實現(xiàn)了系統(tǒng)間的解耦,上游系統(tǒng)不需要關心下游系統(tǒng)的業(yè)務處理結(jié)果;但是數(shù)據(jù)一致性不好處理,如何保證郵件通知系統(tǒng)狀態(tài)與注冊系統(tǒng)狀態(tài)的最終一致。
事務消息處理
此時,需要利用消息隊列RocketMQ版所提供的事務消息來實現(xiàn)系統(tǒng)間的狀態(tài)數(shù)據(jù)一致性。
事務消息 流程說明如下:
1.注冊系統(tǒng)向消息隊列RocketMQ版發(fā)送半事務消息。
1.1 半事務消息發(fā)送成功,進入2。
1.2 半事務消息發(fā)送失敗,注冊系統(tǒng)不進行注冊,流程結(jié)束。
說明 最終注冊系統(tǒng)與郵件通知系統(tǒng)數(shù)據(jù)一致。
2.注冊系統(tǒng)開始注冊。
2.1 注冊成功,進入3.1。
2.2 注冊失敗,進入3.2。
3.注冊系統(tǒng)向消息隊列RocketMQ版發(fā)送半消息狀態(tài)。
3.1 提交半事務消息,產(chǎn)生注冊成功消息,進入4。
3.2 回滾半事務消息,未產(chǎn)生注冊成功消息,流程結(jié)束。
說明 最終注冊系統(tǒng)與郵件通知系統(tǒng)數(shù)據(jù)一致。
4.郵件通知系統(tǒng)接收消息隊列RocketMQ版的注冊成功消息。
5.郵件通知系統(tǒng)發(fā)送注冊成功郵件。
說明 最終注冊系統(tǒng)與郵件通知系統(tǒng)數(shù)據(jù)一致。
關于分布式事務消息的更多詳細內(nèi)容,請參見事務消息。
這一段是摘抄子阿里云的rocketmq文檔介紹,大概的意思就是我發(fā)消息的動作要和一個本地事務進行綁定,我如果發(fā)消息失敗那么你本地事務也不應該執(zhí)行,我本地事務執(zhí)行失敗,那么消息也不應該發(fā)。要保證上下游系統(tǒng)的數(shù)據(jù)是最終一致的,保證消息和本地事務一定是原子性的。
實踐
1.模擬一個接口發(fā)送事務消息
@GetMapping("/sendMessage") public String sendMessage(String cron) { //TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction // (String.format("%s:%s", "FINORDER_MQ_TOPIC_KEY", "FINORDER_PAY_TAG_KEY"), MessageBuilder.withPayload("{\"name\": \"ok\"}").build(), ""); Message<String> mqMessage = MessageBuilder .withPayload("OK 啦!") .setHeader("key", "ALLENS") // ① .build(); TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("FINORDER_MQ_TOPIC_KEY:FINORDER_PAY_TAG_KEY" /* ② */, mqMessage , ""); return "ok"; }
① 設置消息頭,這事務監(jiān)聽里邊可以通過這個header來區(qū)分是哪一個事務,假如單個微服務有多個事務消息就可以用這個來區(qū)分。
② TOPIC + groupid 用":"來分割
創(chuàng)建事務監(jiān)聽器
@Service @RocketMQTransactionListener @Slf4j public class TestTransactionListenerImpl implements RocketMQLocalTransactionListener { /** * 每次推送消息會執(zhí)行executeLocalTransaction方法,首先會發(fā)送半消息,到這里的時候是執(zhí)行具體本地業(yè)務, * 執(zhí)行成功后手動返回RocketMQLocalTransactionState.COMMIT狀態(tài), * 這里是保證本地事務執(zhí)行成功,如果本地事務執(zhí)行失敗則可以返回ROLLBACK進行消息回滾。 此時消息只是被保存到broker,并沒有發(fā)送到topic中,broker會根據(jù)本地返回的狀態(tài)來決定消息的處理方式。 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println("接收到消息:" + msg); System.out.println("Header:" + msg.getHeaders().get("key")); return RocketMQLocalTransactionState.COMMIT; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { return RocketMQLocalTransactionState.COMMIT; } }
該監(jiān)聽器的實現(xiàn)有兩個方法一個是本地事務的執(zhí)行方法executeLocalTransaction,一個是本地事務回查方法checkLocalTransaction。 兩個方法的返回值類型為RocketMQLocalTransactionState,該枚舉有三種:
// COMMIT:即生產(chǎn)者通知Rocket該消息可以消費 RocketMQLocalTransactionState.COMMIT; // ROLLBACK:即生產(chǎn)者通知Rocket將該消息刪除 RocketMQLocalTransactionState.ROLLBACK; // UNKNOWN:即生產(chǎn)者通知Rocket繼續(xù)查詢該消息的狀態(tài) RocketMQLocalTransactionState.UNKNOWN;
對于長時間沒有 Commit/Rollback 的事務消息( pending 狀態(tài)的消息),從服務端發(fā)起一次 回查Producer 收到回查消息,檢查回查消息對應的 本地事務狀態(tài)根據(jù)本地事務狀態(tài),重新 Commit 或者 Rollback。
以上代碼中,如果sex是偶數(shù),executeLocalTransaction會拋出異常,本地事務會回滾,半消息狀態(tài)是UNKNOWN,此時就會啟動消息的回查機制,mq會在一定的時間調(diào)用checkLocalTransaction方法查詢執(zhí)行狀態(tài),根據(jù)執(zhí)行狀態(tài)來決定是繼續(xù)回查、刪除消息、發(fā)送消息。
executeLocalTransaction也可以自己捕獲異常,手動回滾事務,返回RocketMQLocalTransactionState.ROLLBACK,這樣能減少消息回查。
等消息正常提交,半消息消息會移動到發(fā)送指定的TOPIC隊里中,這個時候訂閱者就可以正常獲取消息了。
@Service @RocketMQMessageListener(consumerGroup = "FINORDER_PAY_TAG_KEY",topic = "FINORDER_MQ_TOPIC_KEY") @Slf4j public class MQConsumerService implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("consumer:" + message); } }
總結(jié)
1、首需要注意的是 事務消息(半消息) 僅僅只是保證本地事務和MQ消息發(fā)送形成整體的 原子性 ,而投遞到MQ服務器后,并無法保證消費者一定能消費成功!
2、發(fā)消息的動作要和一個本地事務進行綁定,我如果發(fā)消息失敗那么你本地事務也不應該執(zhí)行,我本地事務執(zhí)行失敗,那么消息也不應該發(fā)。要保證上下游系統(tǒng)的數(shù)據(jù)是最終一致的,保證消息和本地事務一定是原子性的。
3、 半消息如果提交成功最終是要入隊列的,可以正常的收到消息,這個時候可以認為上游系統(tǒng)的依賴條件肯定是已經(jīng)執(zhí)行成功了的。
到此這篇關于Rocketmq事務消息之半消息詳解的文章就介紹到這了,更多相關Rocketmq半消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
RocketMQ特性Broker存儲事務消息實現(xiàn)
這篇文章主要為大家介紹了RocketMQ特性Broker存儲事務消息實現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-08-08深入理解springboot中配置文件application.properties
本文主要介紹了springboot中配置文件application.properties,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-10-10SpringBoot使用PageHelper插件實現(xiàn)Mybatis分頁效果
這篇文章主要介紹了SpringBoot使用PageHelper插件實現(xiàn)Mybatis分頁效果,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作有一定的參考借鑒價值,需要的朋友可以參考下2024-02-02