SpringBoot整合RocketMq實(shí)現(xiàn)分布式事務(wù)
大家好,今天我們繼續(xù)分布式事務(wù)的學(xué)習(xí),之前我們已經(jīng)實(shí)戰(zhàn)了
來實(shí)現(xiàn)分布式事務(wù),今天我們繼續(xù)學(xué)習(xí)springboot整合RocketMq來實(shí)現(xiàn)分布式事務(wù)
MQ實(shí)現(xiàn)分布式事務(wù)原理
RocketMq提供了事務(wù)消息,要實(shí)現(xiàn)分布式事務(wù)主要還是利用它的事務(wù)消息
- 1:服務(wù)A首先會(huì)發(fā)送一條半事務(wù)的消息到MQ,此時(shí)服務(wù)接收方還是無法消費(fèi)這條消息的
- 2:半事務(wù)消息發(fā)送成功之后,服務(wù)A開始執(zhí)行本地業(yè)務(wù)邏輯
- 3:服務(wù)A執(zhí)行完本地業(yè)務(wù)之后,提交事務(wù),事務(wù)提交成功之后,MQ這條半事務(wù)消息就會(huì)變成原始可消費(fèi)的消息
- 4:服務(wù)接收方這時(shí)候就可以消費(fèi)到這條消息,繼續(xù)執(zhí)行后續(xù)業(yè)務(wù)了
那么在這整個(gè)過程中可能會(huì)出現(xiàn)的異常有哪些呢?
1:半事務(wù)消息發(fā)送失敗
如果半事務(wù)消息發(fā)送失敗,那么服務(wù)A就不會(huì)繼續(xù)執(zhí)行接下來的業(yè)務(wù)了,整個(gè)流程會(huì)直接退出
2:本地事務(wù)提交成功,發(fā)送COMMIT消息失敗
服務(wù)A事務(wù)提交之后,需要發(fā)送一條消息告訴MQ,這條半事務(wù)消息可以消費(fèi)了,但是這時(shí)候,COMMIT消息發(fā)送失敗了,那么這條消息就還是處于半事務(wù)狀態(tài),所以MQ會(huì)進(jìn)行回查
回查服務(wù)A這個(gè)事務(wù)是否成功了,如果成功了,就會(huì)發(fā)送回查結(jié)果,如果本地事務(wù)成功了,那么回查就會(huì)發(fā)送COMMIT消息,這條消息重新設(shè)置為可消費(fèi)狀態(tài)
實(shí)戰(zhàn)
服務(wù)-A
@Transactional(rollbackFor = Exception.class) @Override public String mqInsert(Test test) { //本地服務(wù)調(diào)用 testDao.insert(test); //發(fā)送半事務(wù)消息 //Test是我們本地需要保存的一個(gè)對(duì)象 Message<String> message = MessageBuilder.withPayload(JSONObject.toJSONString(test)).build(); rocketMQTemplate.sendMessageInTransaction("test-topic", message, null); return "success"; }
@RocketMQTransactionListener public class TransactionMqListener implements RocketMQLocalTransactionListener { @Resource private TestDao testDao; //執(zhí)行本地事務(wù) @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { //獲取半事務(wù)消息 Test test = JSONObject.parseObject(new String((byte[]) message.getPayload()), Test.class); System.out.println("test | executeLocalTransaction | 消息是:" + JSONObject.toJSONString(test)); //根據(jù)id查詢?cè)撚涗浭欠癖4娉晒α? Test testExist = testDao.queryById(test.getId()); if(testExist == null) { //說明本地事務(wù)提交失敗了,需要回滾 return RocketMQLocalTransactionState.ROLLBACK; } //本地事務(wù)提交成功 return RocketMQLocalTransactionState.COMMIT; } //事務(wù)回查 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { Test test = JSONObject.parseObject(new String((byte[]) message.getPayload()), Test.class); System.out.println("test | checkLocalTransaction | 消息是:" + JSONObject.toJSONString(test)); //還是根據(jù)id去查詢記錄 Test testExist = testDao.queryById(test.getId()); if(testExist == null) { //不存在,說明本地事務(wù)提交失敗,回滾 return RocketMQLocalTransactionState.ROLLBACK; } //本地事務(wù)提交成功了 return RocketMQLocalTransactionState.COMMIT; } }
服務(wù)B
@Component @RocketMQMessageListener(topic = "test-topic",consumerGroup = "cpy-consumer-group") public class CpyListener implements RocketMQListener<String> { //消息監(jiān)聽 @Override public void onMessage(String s) { System.out.println("cpy服務(wù)收到消息:" + JSONObject.toJSONString(s)); } }
測(cè)試
我們先把這里的狀態(tài)改成 UNKNOWN,來模擬本地事務(wù)提交失敗的場(chǎng)景,來驗(yàn)證事務(wù)回查的效果
發(fā)送事務(wù)消息之后,我們這里是UNKNOWN狀態(tài),所以沒有提交成功
此時(shí)服務(wù)-B也沒有消費(fèi)到消息
過了一會(huì),MQ事務(wù)消息進(jìn)行回查,此時(shí)因?yàn)閿?shù)據(jù)庫(kù)已經(jīng)存在這條記錄了,所以直接COMMIT
這時(shí)候服務(wù)消費(fèi)方也成功消費(fèi)到消息了
到此這篇關(guān)于SpringBoot整合RocketMq實(shí)現(xiàn)分布式事務(wù)的文章就介紹到這了,更多相關(guān)SpringBoot RocketMq分布式事務(wù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中List集合去除重復(fù)數(shù)據(jù)的方法匯總
這篇文章主要給大家介紹了關(guān)于Java中List集合去除重復(fù)數(shù)據(jù)的方法,文中通過圖文介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02java 單例的五種實(shí)現(xiàn)方式及其性能分析
這篇文章主要介紹了java 單例的五種實(shí)現(xiàn)方式及其性能分析。的相關(guān)資料,需要的朋友可以參考下2017-07-07MyBatis直接執(zhí)行SQL的工具SqlMapper
今天小編就為大家分享一篇關(guān)于MyBatis直接執(zhí)行SQL的工具SqlMapper,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2018-12-12SpringBoot2.0集成WebSocket實(shí)現(xiàn)后臺(tái)向前端推送信息
這篇文章主要介紹了SpringBoot2.0集成WebSocket實(shí)現(xiàn)后臺(tái)向前端推送信息,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-01-01Java?Chassis3熔斷機(jī)制的改進(jìn)路程技術(shù)解密
這篇文章主要介紹了Java?Chassis?3技術(shù)解密之熔斷機(jī)制的改進(jìn)路程實(shí)例分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01Collections.shuffle()方法實(shí)例解析
這篇文章主要介紹了Collections.shuffle()方法實(shí)例解析,分享了相關(guān)代碼示例,小編覺得還是挺不錯(cuò)的,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-01-01java中用數(shù)組實(shí)現(xiàn)環(huán)形隊(duì)列的示例代碼
這篇文章主要介紹了java中用數(shù)組實(shí)現(xiàn)環(huán)形隊(duì)列的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04