RocketMQ事務(wù)消息機(jī)制詳解
RocketMQ事務(wù)消息
RocketMQ提供了事務(wù)消息,通過事務(wù)消息就能達(dá)到分布式事務(wù)的最終一致,從而實(shí)現(xiàn)了可靠消息服務(wù)。
一、事務(wù)消息的實(shí)現(xiàn)步驟
事務(wù)消息發(fā)送步驟:
1. 發(fā)送方將半事務(wù)消息發(fā)送至RocketMQ服務(wù)端。
2. RocketMQ服務(wù)端將消息持久化之后,向發(fā)送方返回Ack確認(rèn)消息已經(jīng)發(fā)送成功。由于消息為半事務(wù)消息,在未收到生產(chǎn)者對(duì)該消息的二次確認(rèn)前,此消息被標(biāo)記成“暫不能投遞”狀態(tài)。
3. 發(fā)送方開始執(zhí)行本地事務(wù)邏輯。
4. 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)(Commit 或是 Rollback),服務(wù)端收到Commit 狀態(tài)則將半事務(wù)消息標(biāo)記為可投遞,訂閱方最終將收到該消息;服務(wù)端收到 Rollback 狀態(tài)則刪除半事務(wù)消息,訂閱方將不會(huì)接受該消息。
事務(wù)消息回查步驟:
1. 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá)服務(wù)端,經(jīng)過固定時(shí)間后服務(wù)端將對(duì)該消息發(fā)起消息回查。
2. 發(fā)送方收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。 3. 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對(duì)半事務(wù)消息進(jìn)行操作。
二、程序?qū)崿F(xiàn)
事務(wù)消息處理類需要繼承RocketMQLocalTransactionListener類。該類的executeLocalTransaction方法負(fù)責(zé)在接到RocketMQ服務(wù)端的Ack確認(rèn)消息后執(zhí)行本地方法,也就是事務(wù)消息發(fā)送步驟中的步驟3。該類的checkLocalTransaction方法負(fù)責(zé),在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,執(zhí)行RocketMQ服務(wù)端的消息回查,也就是事務(wù)消息回查步驟中的步驟2。
此外,要使該類生效,還需要加@RocketMQTransactionListener注解。這里有個(gè)要特別注意的地方。在2.1.0版本前,這個(gè)注解有一個(gè)屬性txProducerGroup,可以用多個(gè)@RocketMQTransactionListener來監(jiān)聽不同的txProducerGroup來發(fā)送不同類型的事務(wù)消息到topic。但是現(xiàn)在在一個(gè)項(xiàng)目中,如果你在一個(gè)project中寫了多個(gè)@RocketMQTransactionListener,項(xiàng)目將不能啟動(dòng),啟動(dòng)會(huì)報(bào)錯(cuò)。產(chǎn)生這個(gè)問題的原因據(jù)說是,當(dāng)使用RocketMQTemplate并發(fā)的執(zhí)行事務(wù)時(shí),非常容易出現(xiàn)"illegal state"的異常,原因是一個(gè)TransactionProducer在執(zhí)行事務(wù)時(shí)不能被共享。所以,必須使用同一個(gè)TransactionMQProducer來發(fā)送所有類型的事務(wù)消息。當(dāng)然同理也就必須使用一個(gè)偵聽器處理所有的消息了。
既然必須使用同一個(gè)TransactionMQProducer,對(duì)于比較大的應(yīng)用,業(yè)務(wù)場(chǎng)景很多,就會(huì)造成混亂。這里我給出一個(gè)方案拋磚引玉。TransactionMQProducer在發(fā)送消息時(shí),是可以傳遞參數(shù)對(duì)象和指定消息頭的??梢园岩獔?zhí)行的本地方法的bean名和方法名放進(jìn)去。
//發(fā)送半事務(wù)消息 TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction( topicAndTag, MessageBuilder.withPayload(msg) .setHeader(Constants.TX_ID_HEADER_NAME, msg.getTxId()) .setHeader(Constants.CHECK_BEAN_ID_HEADER_NAME, def.getCheckBeanId()) .setHeader(Constants.BIZ_ID_HEADER_NAME, msg.getBizId()) .build(), def );
其中def就是參數(shù)對(duì)象,可以自定義對(duì)象,這里是我自定義的TransactionMsgDefinationDto類,可以把想傳遞的信息放進(jìn)去,最重要的是要執(zhí)行的本地方法的bean名和方法名和方法執(zhí)行參數(shù):executeBeanId(bean名)、executeBeanMethod(方法名)、executeBeanParams(方法執(zhí)行參數(shù))。該對(duì)象可以傳給RocketMQLocalTransactionListener的executeLocalTransaction方法,然后通過反射執(zhí)行。
@Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { //保存消息記錄 String body = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8); JSONObject jsonBody = JSONObject.parseObject(body); BaseMsgDto dto = JSONObject.toJavaObject(jsonBody, BaseMsgDto.class);//(BaseMsgDto)msg.getPayload(); TransactionMsgDefinationDto def = (TransactionMsgDefinationDto)arg; ProducerLog producerLog = BeanCopyUtils.copyProperties(def, ProducerLog::new); String[] tags = def.getMsgTags(); if(tags !=null && tags.length > 0) { StringBuilder tag = new StringBuilder(); for(int i = 0; i<tags.length; i++) { tag.append(tags[0]); if(i != tags.length-1) { tag.append("||"); } } producerLog.setMsgTag(tag.toString()); } producerLog.setBizId(dto.getBizId()); producerLog.setTxId(dto.getTxId()); producerLog.setBizType(dto.getBizType()); producerLog.setGroupName(dto.getProducerGroup()); producerLog.setMsgBody(body); producerLogService.save(producerLog); //執(zhí)行事務(wù)方法 SpringUtil.invokeBeanMethod(def.getExecuteBeanId(), def.getExecuteBeanMethod(), def.getExecuteBeanParams()); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { logger.error("發(fā)生錯(cuò)誤:", e); return RocketMQLocalTransactionState.UNKNOWN; } }
放在消息頭header中的數(shù)據(jù)可以傳遞給RocketMQLocalTransactionListener的checkLocalTransaction方法,然后同樣通過反射執(zhí)行。
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { try { String txId = (String)msg.getHeaders().get(Constants.TX_ID_HEADER_NAME); String checkBeanId = (String)msg.getHeaders().get(Constants.CHECK_BEAN_ID_HEADER_NAME); Long bizId = Long.parseLong((String)msg.getHeaders().get(Constants.BIZ_ID_HEADER_NAME)); //執(zhí)行檢查方法 Boolean ret = (Boolean)SpringUtil.invokeBeanMethod(checkBeanId, "check", new Object[]{bizId, txId}); if(ret.booleanValue()) return RocketMQLocalTransactionState.COMMIT; else return RocketMQLocalTransactionState.ROLLBACK; } catch (Exception e) { logger.error("發(fā)生錯(cuò)誤:", e); return RocketMQLocalTransactionState.UNKNOWN; } }
到此這篇關(guān)于RocketMQ事務(wù)消息機(jī)制詳解的文章就介紹到這了,更多相關(guān)RocketMQ事務(wù)消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java httpcomponents發(fā)送get post請(qǐng)求代碼實(shí)例
這篇文章主要介紹了Java httpcomponents發(fā)送get post請(qǐng)求代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09Maven多個(gè)項(xiàng)目實(shí)現(xiàn)聚合過程解析
這篇文章主要介紹了Maven多個(gè)項(xiàng)目實(shí)現(xiàn)聚合過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-08-08IDEA工程運(yùn)行時(shí)總是報(bào)xx程序包不存在實(shí)際上包已導(dǎo)入(問題分析及解決方案)
這篇文章主要介紹了IDEA工程運(yùn)行時(shí),總是報(bào)xx程序包不存在,實(shí)際上包已導(dǎo)入,本文給大家分享問題分析及解決方案,通過實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2020-08-08Java?SE使用for?each循環(huán)遍歷數(shù)組的方法代碼
在Java?SE開發(fā)中,數(shù)組是最常見的數(shù)據(jù)結(jié)構(gòu)之一,Java提供了多種遍歷數(shù)組的方式,其中for循環(huán)是最常用的方式之一,本文將介紹如何使用for?each循環(huán)遍歷數(shù)組,接下來,我們將通過一個(gè)簡(jiǎn)單的代碼示例來展示如何使用for?each循環(huán)遍歷數(shù)組,需要的朋友可以參考下2023-11-11