Java中的Rocketmq事務(wù)消息詳解
什么是RocketMQ半消息(事務(wù)消息)
當(dāng)我們?cè)跇I(yè)務(wù)邏輯中發(fā)送消息時(shí),消息與業(yè)務(wù)的事務(wù)之間難以保證一致性,如果業(yè)務(wù)代碼出現(xiàn)異常,如果已發(fā)送的消息無法回滾,則很會(huì)出現(xiàn)數(shù)據(jù)不一致的情況,RocketMQ的事務(wù)消息支持在業(yè)務(wù)邏輯與發(fā)送消息之間提供事務(wù)保證,RocketMQ通過兩階段的方式提供事務(wù)消息的支持。
首需要注意的是 事務(wù)消息(半消息) 僅僅只是保證本地事務(wù)和MQ消息發(fā)送形成整體的 原子性 ,而投遞到MQ服務(wù)器后,并無法保證消費(fèi)者一定能消費(fèi)成功!
- 事務(wù)消息 :MQ 提供類似 X/Open XA 的分布事務(wù)功能,通過 MQ 事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。
- 半消息 :暫不能投遞的消息,發(fā)送方已經(jīng)將消息成功發(fā)送到了 MQ 服務(wù)端,但是服務(wù)端未收到生產(chǎn)者對(duì)該消息的二次確認(rèn),此時(shí)該消息被標(biāo)記成“暫不能投遞”狀態(tài),處于該種狀態(tài)下的消息即半消息。
- 半消息回查 :由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,MQ 服務(wù)端通過掃描發(fā)現(xiàn)某條消息長(zhǎng)期處于“半消息”時(shí),需要主動(dòng)向消息生產(chǎn)者詢問該消息的最終狀態(tài)(Commit 或是 Rollback),該過程即消息回查。
- 極端情況:是否任何情況下MQ的事務(wù)性消息都可以保證雙方的最終一致性?
答案是否定的。 考慮上面提到的異常情況“情況2:MQ發(fā)送方在步驟3執(zhí)行完本地事務(wù)之后commit之前異常退出”。在這種情況下如果如果MQ發(fā)送方由于運(yùn)維上的失誤長(zhǎng)時(shí)間不重啟MQ發(fā)送方,那么MQ在多次回查不成功之后將會(huì)丟棄該消息。最終分布式事務(wù)的雙方是不能達(dá)到最終一致性了。
當(dāng)然這個(gè)回查的最大值可以通過修改broker的參數(shù)transactionCheckMax來調(diào)整。但是過大的transactionCheckMax參數(shù)將會(huì)導(dǎo)致MQ堆積過多的半包消息,從而危害MQ的穩(wěn)定性,是個(gè)需要權(quán)衡的參數(shù)。
半消息事務(wù)實(shí)現(xiàn)流程
流程:
1.發(fā)送方向 MQ 服務(wù)端發(fā)送事務(wù)消息;
2.MQ Server 將消息持久化成功之后,向發(fā)送方 ACK 確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息為半消息。
3.發(fā)送方開始執(zhí)行本地事務(wù)邏輯。
4.發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向 MQ Server 提交二次確認(rèn)(Commit 或是 Rollback),MQ Server 收到 Commit 狀態(tài)則將半消息標(biāo)記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態(tài)則刪除半消息,訂閱方將不會(huì)接受該消息。
5.在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá) MQ Server,經(jīng)過固定時(shí)間后 MQ Server 將對(duì)該消息發(fā)起消息回查。
6.發(fā)送方收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
7.發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),MQ Server 仍按照步驟4對(duì)半消息進(jìn)行操作。
半消息的應(yīng)用場(chǎng)景是什么?
注冊(cè)系統(tǒng)注冊(cè)的流程中,用戶入口在網(wǎng)頁(yè)注冊(cè)系統(tǒng),通知系統(tǒng)在郵件系統(tǒng),兩個(gè)系統(tǒng)之間的數(shù)據(jù)需要保持最終一致。
普通消息處理
如上所述,注冊(cè)系統(tǒng)和郵件通知系統(tǒng)之間通過消息隊(duì)列進(jìn)行異步處理。注冊(cè)系統(tǒng)將注冊(cè)信息寫入注冊(cè)系統(tǒng)之后,發(fā)送一條注冊(cè)成功的消息到消息隊(duì)列RocketMQ版,郵件通知系統(tǒng)訂閱消息隊(duì)列RocketMQ版的注冊(cè)消息,做相應(yīng)的業(yè)務(wù)處理,發(fā)送注冊(cè)成功或者失敗的郵件。
流程說明如下:
1.注冊(cè)系統(tǒng)發(fā)起注冊(cè)。
2.注冊(cè)系統(tǒng)向消息隊(duì)列RocketMQ版發(fā)送注冊(cè)消息成功與否的消息。
2.1 消息發(fā)送成功,進(jìn)入3。
2.2 消息發(fā)送失敗,導(dǎo)致郵件通知系統(tǒng)未收到消息隊(duì)列RocketMQ版發(fā)送的注冊(cè)成功與否的消息,而無法發(fā)送郵件,最終郵件通知系統(tǒng)和注冊(cè)系統(tǒng)之間的狀態(tài)數(shù)據(jù)不一致,流程結(jié)束。
3.郵件通知系統(tǒng)收到消息隊(duì)列RocketMQ版的注冊(cè)成功消息。
4.郵件通知系統(tǒng)發(fā)送注冊(cè)成功郵件給用戶。
在這樣的情況下,雖然實(shí)現(xiàn)了系統(tǒng)間的解耦,上游系統(tǒng)不需要關(guān)心下游系統(tǒng)的業(yè)務(wù)處理結(jié)果;
但是數(shù)據(jù)一致性不好處理,如何保證郵件通知系統(tǒng)狀態(tài)與注冊(cè)系統(tǒng)狀態(tài)的最終一致。
事務(wù)消息處理
此時(shí),需要利用消息隊(duì)列RocketMQ版所提供的事務(wù)消息來實(shí)現(xiàn)系統(tǒng)間的狀態(tài)數(shù)據(jù)一致性。
事務(wù)消息 流程說明如下:
1.注冊(cè)系統(tǒng)向消息隊(duì)列RocketMQ版發(fā)送半事務(wù)消息。
1.1 半事務(wù)消息發(fā)送成功,進(jìn)入2。
1.2 半事務(wù)消息發(fā)送失敗,注冊(cè)系統(tǒng)不進(jìn)行注冊(cè),流程結(jié)束。
- 說明 最終注冊(cè)系統(tǒng)與郵件通知系統(tǒng)數(shù)據(jù)一致。
2.注冊(cè)系統(tǒng)開始注冊(cè)。
2.1 注冊(cè)成功,進(jìn)入3.1。
2.2 注冊(cè)失敗,進(jìn)入3.2。
3.注冊(cè)系統(tǒng)向消息隊(duì)列RocketMQ版發(fā)送半消息狀態(tài)。
3.1 提交半事務(wù)消息,產(chǎn)生注冊(cè)成功消息,進(jìn)入4。
3.2 回滾半事務(wù)消息,未產(chǎn)生注冊(cè)成功消息,流程結(jié)束。
- 說明 最終注冊(cè)系統(tǒng)與郵件通知系統(tǒng)數(shù)據(jù)一致。
4.郵件通知系統(tǒng)接收消息隊(duì)列RocketMQ版的注冊(cè)成功消息。
5.郵件通知系統(tǒng)發(fā)送注冊(cè)成功郵件。
- 說明 最終注冊(cè)系統(tǒng)與郵件通知系統(tǒng)數(shù)據(jù)一致。
關(guān)于分布式事務(wù)消息的更多詳細(xì)內(nèi)容,請(qǐng)參見事務(wù)消息。
這一段是摘抄子阿里云的rocketmq文檔介紹,大概的意思就是我發(fā)消息的動(dòng)作要和一個(gè)本地事務(wù)進(jìn)行綁定,我如果發(fā)消息失敗那么你本地事務(wù)也不應(yīng)該執(zhí)行,我本地事務(wù)執(zhí)行失敗,那么消息也不應(yīng)該發(fā)。要保證上下游系統(tǒng)的數(shù)據(jù)是最終一致的,保證消息和本地事務(wù)一定是原子性的。
實(shí)踐
模擬一個(gè)接口發(fā)送事務(wù)消息
@GetMapping("/sendMessage") public String sendMessage(String cron) { //TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction // (String.format("%s:%s", "FINORDER_MQ_TOPIC_KEY", "FINORDER_PAY_TAG_KEY"), MessageBuilder.withPayload("{\"name\": \"ok\"}").build(), ""); Message<String> mqMessage = MessageBuilder .withPayload("OK 啦!") .setHeader("key", "ALLENS") // ① .build(); TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("FINORDER_MQ_TOPIC_KEY:FINORDER_PAY_TAG_KEY" /* ② */, mqMessage , ""); return "ok"; }
① 設(shè)置消息頭,這事務(wù)監(jiān)聽里邊可以通過這個(gè)header來區(qū)分是哪一個(gè)事務(wù),假如單個(gè)微服務(wù)有多個(gè)事務(wù)消息就可以用這個(gè)來區(qū)分。
② TOPIC + groupid 用":"來分割
創(chuàng)建事務(wù)監(jiān)聽器
@Service @RocketMQTransactionListener @Slf4j public class TestTransactionListenerImpl implements RocketMQLocalTransactionListener { /** * 每次推送消息會(huì)執(zhí)行executeLocalTransaction方法,首先會(huì)發(fā)送半消息,到這里的時(shí)候是執(zhí)行具體本地業(yè)務(wù), * 執(zhí)行成功后手動(dòng)返回RocketMQLocalTransactionState.COMMIT狀態(tài), * 這里是保證本地事務(wù)執(zhí)行成功,如果本地事務(wù)執(zhí)行失敗則可以返回ROLLBACK進(jìn)行消息回滾。 此時(shí)消息只是被保存到broker,并沒有發(fā)送到topic中,broker會(huì)根據(jù)本地返回的狀態(tài)來決定消息的處理方式。 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println("接收到消息:" + msg); System.out.println("Header:" + msg.getHeaders().get("key")); return RocketMQLocalTransactionState.COMMIT; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { return RocketMQLocalTransactionState.COMMIT; } }
該監(jiān)聽器的實(shí)現(xiàn)有兩個(gè)方法一個(gè)是本地事務(wù)的執(zhí)行方法executeLocalTransaction,一個(gè)是本地事務(wù)回查方法checkLocalTransaction。 兩個(gè)方法的返回值類型為RocketMQLocalTransactionState,該枚舉有三種:
// COMMIT:即生產(chǎn)者通知Rocket該消息可以消費(fèi) RocketMQLocalTransactionState.COMMIT; // ROLLBACK:即生產(chǎn)者通知Rocket將該消息刪除 RocketMQLocalTransactionState.ROLLBACK; // UNKNOWN:即生產(chǎn)者通知Rocket繼續(xù)查詢?cè)撓⒌臓顟B(tài) RocketMQLocalTransactionState.UNKNOWN;
對(duì)于長(zhǎng)時(shí)間沒有 Commit/Rollback 的事務(wù)消息( pending 狀態(tài)的消息),從服務(wù)端發(fā)起一次 回查Producer 收到回查消息,檢查回查消息對(duì)應(yīng)的 本地事務(wù)狀態(tài)根據(jù)本地事務(wù)狀態(tài),重新 Commit 或者 Rollback。
以上代碼中,如果sex是偶數(shù),executeLocalTransaction會(huì)拋出異常,本地事務(wù)會(huì)回滾,半消息狀態(tài)是UNKNOWN,此時(shí)就會(huì)啟動(dòng)消息的回查機(jī)制,mq會(huì)在一定的時(shí)間調(diào)用checkLocalTransaction方法查詢執(zhí)行狀態(tài),根據(jù)執(zhí)行狀態(tài)來決定是繼續(xù)回查、刪除消息、發(fā)送消息。
executeLocalTransaction也可以自己捕獲異常,手動(dòng)回滾事務(wù),返回RocketMQLocalTransactionState.ROLLBACK,這樣能減少消息回查。
等消息正常提交,半消息消息會(huì)移動(dòng)到發(fā)送指定的TOPIC隊(duì)里中,這個(gè)時(shí)候訂閱者就可以正常獲取消息了。
@Service @RocketMQMessageListener(consumerGroup = "FINORDER_PAY_TAG_KEY",topic = "FINORDER_MQ_TOPIC_KEY") @Slf4j public class MQConsumerService implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("consumer:" + message); } }
總結(jié)
1、首需要注意的是 事務(wù)消息(半消息) 僅僅只是保證本地事務(wù)和MQ消息發(fā)送形成整體的 原子性 ,而投遞到MQ服務(wù)器后,并無法保證消費(fèi)者一定能消費(fèi)成功!
2、發(fā)消息的動(dòng)作要和一個(gè)本地事務(wù)進(jìn)行綁定,我如果發(fā)消息失敗那么你本地事務(wù)也不應(yīng)該執(zhí)行,我本地事務(wù)執(zhí)行失敗,那么消息也不應(yīng)該發(fā)。要保證上下游系統(tǒng)的數(shù)據(jù)是最終一致的,保證消息和本地事務(wù)一定是原子性的。
3、 半消息如果提交成功最終是要入隊(duì)列的,可以正常的收到消息,這個(gè)時(shí)候可以認(rèn)為上游系統(tǒng)的依賴條件肯定是已經(jīng)執(zhí)行成功了的。
到此這篇關(guān)于Java中的Rocketmq事務(wù)消息詳解的文章就介紹到這了,更多相關(guān)Rocketmq事務(wù)消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
javaweb圖書商城設(shè)計(jì)之訂單模塊(5)
這篇文章主要為大家詳細(xì)介紹了javaweb圖書商城設(shè)計(jì)之訂單模塊,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-11-11spring?bean標(biāo)簽中的init-method和destroy-method詳解
這篇文章主要介紹了spring?bean標(biāo)簽中的init-method和destroy-method,在很多項(xiàng)目中,經(jīng)常在xml配置文件中看到init-method 或者 destroy-method ,因此整理收集下,方便以后參考和學(xué)習(xí),需要的朋友可以參考下2023-04-04Java爬蟲實(shí)戰(zhàn)抓取一個(gè)網(wǎng)站上的全部鏈接
這篇文章主要介紹了JAVA使用爬蟲抓取網(wǎng)站網(wǎng)頁(yè)內(nèi)容的方法,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧。2016-10-10深入詳解java高并發(fā)熱點(diǎn)數(shù)據(jù)更新
這篇文章主要為大家深入介紹了java高并發(fā)熱點(diǎn)數(shù)據(jù)更新詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-04-04java客戶端Jedis操作Redis Sentinel 連接池的實(shí)現(xiàn)方法
下面小編就為大家?guī)硪黄猨ava客戶端Jedis操作Redis Sentinel 連接池的實(shí)現(xiàn)方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-03-03K均值聚類算法的Java版實(shí)現(xiàn)代碼示例
這篇文章主要介紹了K均值聚類算法的Java版實(shí)現(xiàn)代碼示例,具有一定借鑒價(jià)值,需要的朋友可以參考下。2017-12-12java使用BeanUtils.copyProperties方法對(duì)象復(fù)制同名字段類型不同賦值為空問題解決方案
這篇文章主要給大家介紹了關(guān)于java使用BeanUtils.copyProperties方法對(duì)象復(fù)制同名字段類型不同賦值為空問題的解決方案,文中通過代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-11-11