SpringBoot集成RocketMQ發(fā)送事務(wù)消息的原理解析
簡介
RocketMQ 事務(wù)消息(Transactional Message)是指應(yīng)用本地事務(wù)和發(fā)送消息操作可以被定義到全局事務(wù)中,要么同時成功,要么同時失敗。RocketMQ 的事務(wù)消息提供類似 X/Open XA 的分布事務(wù)功能,通過事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。
原理
RocketMQ事務(wù)消息通過異步確保方式,保證事務(wù)的最終一致性。設(shè)計的思想可以借鑒兩個階段提交事務(wù)。其執(zhí)行流程圖如下:
- 發(fā)送方向MQ服務(wù)端發(fā)送消息。
- MQ Server將消息持久化成功之后,向發(fā)送方 ACK 確認(rèn)消息已經(jīng)發(fā)送成功,此時消息為半消息。
- 發(fā)送方開始執(zhí)行本地事務(wù)邏輯。
- 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向 MQ Server 提交二次確認(rèn)(Commit 或是 Rollback),MQ Server 收到 Commit 狀態(tài)則將半消息標(biāo)記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態(tài)則刪除半消息,訂閱方將不會接受該消息。
- 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá) MQ Server,經(jīng)過固定時間后 MQ Server 將對該消息發(fā)起消息回查。
- 發(fā)送方收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
- 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),MQ Server 仍按照步驟4對半消息進(jìn)行操作。
具體實現(xiàn)
消費者
@Component public class TransactionProduce { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private RocketMQTemplate rocketMQTemplate; public void sendTransactionMessage(String msg) { logger.info("start sendTransMessage hashKey:{}",msg); Message message =new Message(); message.setBody("this is tx message".getBytes()); TransactionSendResult result=rocketMQTemplate.sendMessageInTransaction("test-tx-rocketmq", MessageBuilder.withPayload(message).build(), msg); //發(fā)送狀態(tài) String sendStatus = result.getSendStatus().name(); // 本地事務(wù)執(zhí)行狀態(tài) String localTxState = result.getLocalTransactionState().name(); logger.info("send tx message sendStatus:{},localTXState:{}",sendStatus,localTxState); } }
說明:發(fā)送事務(wù)消息采用的是sendMessageInTransaction方法,返回結(jié)果為TransactionSendResult對象,該對象中包含了事務(wù)發(fā)送的狀態(tài)、本地事務(wù)執(zhí)行的狀態(tài)等。
消費者
@Component @RocketMQMessageListener(consumerGroup="test-txRocketmq-group",topic="test-tx-rocketmq", messageModel = MessageModel.CLUSTERING) public class TransactionConsumer implements RocketMQListener<String> { private Logger logger =LoggerFactory.getLogger(getClass()); @Override public void onMessage(String message) { logger.info("send transaction mssage parma is:{}", message); } }
說明:發(fā)送事務(wù)消息的消費者與普通的消費者一樣沒有太大的區(qū)別。
生產(chǎn)者消息監(jiān)聽器
發(fā)送事務(wù)消息除了生產(chǎn)者和消費者以外,我們還需要創(chuàng)建生產(chǎn)者的消息監(jiān)聽器,來監(jiān)聽本地事務(wù)執(zhí)行的狀態(tài)和檢查本地事務(wù)狀態(tài)。
@RocketMQTransactionListener public class TransactionMsgListener implements RocketMQLocalTransactionListener { private Logger logger = LoggerFactory.getLogger(getClass()); /** * 執(zhí)行本地事務(wù) */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object obj) { logger.info("start invoke local rocketMQ transaction"); RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT; try { //處理業(yè)務(wù) String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8); logger.info("invoke msg content:{}",jsonStr); } catch (Exception e) { logger.error("invoke local mq trans error",e); resultState = RocketMQLocalTransactionState.UNKNOWN; } return resultState; } /** * 檢查本地事務(wù)的狀態(tài) */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { logger.info("start check Local rocketMQ transaction"); RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT; try { String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8); logger.info("check trans msg content:{}",jsonStr); } catch (Exception e) { resultState = RocketMQLocalTransactionState.ROLLBACK; } return resultState; } }
說明:RocketMQ本地事務(wù)狀態(tài)由如下幾種:
- RocketMQLocalTransactionState.COMMIT:提交事務(wù),允許消費者消費此消息。
- RocketMQLocalTransactionState.ROLLBACK: 回滾事務(wù),消息將被刪除,不允許被消費。
- RocketMQLocalTransactionState.UNKNOWN:中間狀態(tài),代表需要進(jìn)行檢查來確定狀態(tài)。
注意:Spring Boot2.0的版本之后,@RocketMQTransactionListener 已經(jīng)沒有了txProducerGroup屬性,且sendMessageInTransaction方法也將其移除。所以在同一項目中只能有一個@RocketMQTransactionListener,不能出現(xiàn)多個,否則會報如下錯誤:
java.lang.IllegalStateException: rocketMQTemplate already exists RocketMQLocalTransactionListener
消息事務(wù)測試
正常測試
c.s.fw.mq.produce.TransactionProduce - product start sendTransMessage msg:{"userId":"zhangsann"} c.s.f.m.p.TransactionMsgListener - start invoke local rocketMQ transaction c.s.f.m.p.TransactionMsgListener - invoke local transaction msg content:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsgOK":true,"buyerId":null} c.s.fw.mq.produce.TransactionProduce - send tx message sendStatus:SEND_OK,localTXState:COMMIT_MESSAGE c.s.f.m.consumer.TransactionConsumer - send transaction mssage parma is:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsgOK":true,"buyerId":null}
說明:通過日志我們可以看出,執(zhí)行的流程與上述的一致,執(zhí)行成功后,消息執(zhí)行成功返回的結(jié)果為SEND_OK,本地事務(wù)執(zhí)行的狀態(tài)為COMMIT_MESSAGE。
異常測試
如果在執(zhí)行本地消息時出現(xiàn)異常,那么執(zhí)行結(jié)果會是怎樣?修改下本地事務(wù)執(zhí)行的方法,讓其出現(xiàn)異常。
代碼調(diào)整
@Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object obj) { logger.info("start invoke local rocketMQ transaction"); RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT; try { //處理業(yè)務(wù) String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8); logger.info("invoke local transaction msg content:{}",jsonStr); int c=1/0; } catch (Exception e) { logger.error("invoke local mq trans error",e); resultState = RocketMQLocalTransactionState.UNKNOWN; } return resultState; }
執(zhí)行結(jié)果
c.s.fw.mq.produce.TransactionProduce - send tx message sendStatus:SEND_OK,localTXState:UNKNOW
從執(zhí)行的結(jié)果可以看出,消息執(zhí)行成功返回的結(jié)果為SEND_OK,本地事務(wù)執(zhí)行的狀態(tài)為:UNKNOW.所以消費端無法消費此消息。
總結(jié)
到此這篇關(guān)于SpringBoot集成RocketMQ發(fā)送事務(wù)消息的文章就介紹到這了,更多相關(guān)SpringBoot集成RocketMQ事務(wù)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java使用Rxtx實現(xiàn)串口通信調(diào)試工具
這篇文章主要為大家詳細(xì)介紹了java使用Rxtx實現(xiàn)簡單串口通信調(diào)試工具,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-12-12SpringBoot業(yè)務(wù)邏輯異常的處理方法介紹
本篇文章為大家展示了如何在SpringBoot中統(tǒng)一處理邏輯異常,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲2022-09-09IDEA 2020版本最新破解教程可激活至2089年(推薦)
這篇文章主要介紹了IDEA 2020版本最新破解教程可激活至2089年,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-09-09springboot實現(xiàn)rabbitmq的隊列初始化和綁定
這篇文章主要介紹了springboot實現(xiàn)rabbitmq的隊列初始化和綁定,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-10-10使用gRPC微服務(wù)的內(nèi)部通信優(yōu)化
這篇文章主要為大家介紹了微服務(wù)優(yōu)化之使用gRPC做微服務(wù)的內(nèi)部通信,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-03-03SpringBoot集成Tomcat服務(wù)架構(gòu)配置
這篇文章主要為大家介紹了SpringBoot集成Tomcat服務(wù)架構(gòu)配置,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02解決IDEA中多模塊下Mybatis逆向工程不生成相應(yīng)文件的情況
這篇文章主要介紹了解決IDEA中多模塊下Mybatis逆向工程不生成相應(yīng)文件的情況,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01