RocketMQ事務消息機制詳解
RocketMQ事務消息
RocketMQ提供了事務消息,通過事務消息就能達到分布式事務的最終一致,從而實現了可靠消息服務。
一、事務消息的實現步驟
事務消息發(fā)送步驟:
1. 發(fā)送方將半事務消息發(fā)送至RocketMQ服務端。
2. RocketMQ服務端將消息持久化之后,向發(fā)送方返回Ack確認消息已經發(fā)送成功。由于消息為半事務消息,在未收到生產者對該消息的二次確認前,此消息被標記成“暫不能投遞”狀態(tài)。
3. 發(fā)送方開始執(zhí)行本地事務邏輯。
4. 發(fā)送方根據本地事務執(zhí)行結果向服務端提交二次確認(Commit 或是 Rollback),服務端收到Commit 狀態(tài)則將半事務消息標記為可投遞,訂閱方最終將收到該消息;服務端收到 Rollback 狀態(tài)則刪除半事務消息,訂閱方將不會接受該消息。
事務消息回查步驟:
1. 在斷網或者是應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達服務端,經過固定時間后服務端將對該消息發(fā)起消息回查。
2. 發(fā)送方收到消息回查后,需要檢查對應消息的本地事務執(zhí)行的最終結果。 3. 發(fā)送方根據檢查得到的本地事務的最終狀態(tài)再次提交二次確認,服務端仍按照步驟4對半事務消息進行操作。
二、程序實現
事務消息處理類需要繼承RocketMQLocalTransactionListener類。該類的executeLocalTransaction方法負責在接到RocketMQ服務端的Ack確認消息后執(zhí)行本地方法,也就是事務消息發(fā)送步驟中的步驟3。該類的checkLocalTransaction方法負責,在斷網或者是應用重啟的特殊情況下,執(zhí)行RocketMQ服務端的消息回查,也就是事務消息回查步驟中的步驟2。
此外,要使該類生效,還需要加@RocketMQTransactionListener注解。這里有個要特別注意的地方。在2.1.0版本前,這個注解有一個屬性txProducerGroup,可以用多個@RocketMQTransactionListener來監(jiān)聽不同的txProducerGroup來發(fā)送不同類型的事務消息到topic。但是現在在一個項目中,如果你在一個project中寫了多個@RocketMQTransactionListener,項目將不能啟動,啟動會報錯。產生這個問題的原因據說是,當使用RocketMQTemplate并發(fā)的執(zhí)行事務時,非常容易出現"illegal state"的異常,原因是一個TransactionProducer在執(zhí)行事務時不能被共享。所以,必須使用同一個TransactionMQProducer來發(fā)送所有類型的事務消息。當然同理也就必須使用一個偵聽器處理所有的消息了。
既然必須使用同一個TransactionMQProducer,對于比較大的應用,業(yè)務場景很多,就會造成混亂。這里我給出一個方案拋磚引玉。TransactionMQProducer在發(fā)送消息時,是可以傳遞參數對象和指定消息頭的。可以把要執(zhí)行的本地方法的bean名和方法名放進去。
//發(fā)送半事務消息 TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction( topicAndTag, MessageBuilder.withPayload(msg) .setHeader(Constants.TX_ID_HEADER_NAME, msg.getTxId()) .setHeader(Constants.CHECK_BEAN_ID_HEADER_NAME, def.getCheckBeanId()) .setHeader(Constants.BIZ_ID_HEADER_NAME, msg.getBizId()) .build(), def );
其中def就是參數對象,可以自定義對象,這里是我自定義的TransactionMsgDefinationDto類,可以把想傳遞的信息放進去,最重要的是要執(zhí)行的本地方法的bean名和方法名和方法執(zhí)行參數:executeBeanId(bean名)、executeBeanMethod(方法名)、executeBeanParams(方法執(zhí)行參數)。該對象可以傳給RocketMQLocalTransactionListener的executeLocalTransaction方法,然后通過反射執(zhí)行。
@Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { //保存消息記錄 String body = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8); JSONObject jsonBody = JSONObject.parseObject(body); BaseMsgDto dto = JSONObject.toJavaObject(jsonBody, BaseMsgDto.class);//(BaseMsgDto)msg.getPayload(); TransactionMsgDefinationDto def = (TransactionMsgDefinationDto)arg; ProducerLog producerLog = BeanCopyUtils.copyProperties(def, ProducerLog::new); String[] tags = def.getMsgTags(); if(tags !=null && tags.length > 0) { StringBuilder tag = new StringBuilder(); for(int i = 0; i<tags.length; i++) { tag.append(tags[0]); if(i != tags.length-1) { tag.append("||"); } } producerLog.setMsgTag(tag.toString()); } producerLog.setBizId(dto.getBizId()); producerLog.setTxId(dto.getTxId()); producerLog.setBizType(dto.getBizType()); producerLog.setGroupName(dto.getProducerGroup()); producerLog.setMsgBody(body); producerLogService.save(producerLog); //執(zhí)行事務方法 SpringUtil.invokeBeanMethod(def.getExecuteBeanId(), def.getExecuteBeanMethod(), def.getExecuteBeanParams()); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { logger.error("發(fā)生錯誤:", e); return RocketMQLocalTransactionState.UNKNOWN; } }
放在消息頭header中的數據可以傳遞給RocketMQLocalTransactionListener的checkLocalTransaction方法,然后同樣通過反射執(zhí)行。
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { try { String txId = (String)msg.getHeaders().get(Constants.TX_ID_HEADER_NAME); String checkBeanId = (String)msg.getHeaders().get(Constants.CHECK_BEAN_ID_HEADER_NAME); Long bizId = Long.parseLong((String)msg.getHeaders().get(Constants.BIZ_ID_HEADER_NAME)); //執(zhí)行檢查方法 Boolean ret = (Boolean)SpringUtil.invokeBeanMethod(checkBeanId, "check", new Object[]{bizId, txId}); if(ret.booleanValue()) return RocketMQLocalTransactionState.COMMIT; else return RocketMQLocalTransactionState.ROLLBACK; } catch (Exception e) { logger.error("發(fā)生錯誤:", e); return RocketMQLocalTransactionState.UNKNOWN; } }
到此這篇關于RocketMQ事務消息機制詳解的文章就介紹到這了,更多相關RocketMQ事務消息內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java httpcomponents發(fā)送get post請求代碼實例
這篇文章主要介紹了Java httpcomponents發(fā)送get post請求代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-09-09IDEA工程運行時總是報xx程序包不存在實際上包已導入(問題分析及解決方案)
這篇文章主要介紹了IDEA工程運行時,總是報xx程序包不存在,實際上包已導入,本文給大家分享問題分析及解決方案,通過實例代碼給大家介紹的非常詳細,需要的朋友可以參考下2020-08-08Java?SE使用for?each循環(huán)遍歷數組的方法代碼
在Java?SE開發(fā)中,數組是最常見的數據結構之一,Java提供了多種遍歷數組的方式,其中for循環(huán)是最常用的方式之一,本文將介紹如何使用for?each循環(huán)遍歷數組,接下來,我們將通過一個簡單的代碼示例來展示如何使用for?each循環(huán)遍歷數組,需要的朋友可以參考下2023-11-11