欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ事務消息機制詳解

 更新時間:2024年01月11日 09:28:50   作者:智由靜生  
這篇文章主要介紹了RocketMQ事務消息機制詳解,RocketMQ服務端將消息持久化之后,向發(fā)送方返回Ack確認消息已經發(fā)送成功,由于消息為半事務消息,在未收到生產者對該消息的二次確認前,此消息被標記成"暫不能投遞"狀態(tài),需要的朋友可以參考下

RocketMQ事務消息

RocketMQ提供了事務消息,通過事務消息就能達到分布式事務的最終一致,從而實現了可靠消息服務。

一、事務消息的實現步驟

事務消息發(fā)送步驟:

1. 發(fā)送方將半事務消息發(fā)送至RocketMQ服務端。

2. RocketMQ服務端將消息持久化之后,向發(fā)送方返回Ack確認消息已經發(fā)送成功。由于消息為半事務消息,在未收到生產者對該消息的二次確認前,此消息被標記成“暫不能投遞”狀態(tài)。

3. 發(fā)送方開始執(zhí)行本地事務邏輯。

4. 發(fā)送方根據本地事務執(zhí)行結果向服務端提交二次確認(Commit 或是 Rollback),服務端收到Commit 狀態(tài)則將半事務消息標記為可投遞,訂閱方最終將收到該消息;服務端收到 Rollback 狀態(tài)則刪除半事務消息,訂閱方將不會接受該消息。

事務消息回查步驟:

1. 在斷網或者是應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達服務端,經過固定時間后服務端將對該消息發(fā)起消息回查。

2. 發(fā)送方收到消息回查后,需要檢查對應消息的本地事務執(zhí)行的最終結果。 3. 發(fā)送方根據檢查得到的本地事務的最終狀態(tài)再次提交二次確認,服務端仍按照步驟4對半事務消息進行操作。

二、程序實現

事務消息處理類需要繼承RocketMQLocalTransactionListener類。該類的executeLocalTransaction方法負責在接到RocketMQ服務端的Ack確認消息后執(zhí)行本地方法,也就是事務消息發(fā)送步驟中的步驟3。該類的checkLocalTransaction方法負責,在斷網或者是應用重啟的特殊情況下,執(zhí)行RocketMQ服務端的消息回查,也就是事務消息回查步驟中的步驟2。

此外,要使該類生效,還需要加@RocketMQTransactionListener注解。這里有個要特別注意的地方。在2.1.0版本前,這個注解有一個屬性txProducerGroup,可以用多個@RocketMQTransactionListener來監(jiān)聽不同的txProducerGroup來發(fā)送不同類型的事務消息到topic。但是現在在一個項目中,如果你在一個project中寫了多個@RocketMQTransactionListener,項目將不能啟動,啟動會報錯。產生這個問題的原因據說是,當使用RocketMQTemplate并發(fā)的執(zhí)行事務時,非常容易出現"illegal state"的異常,原因是一個TransactionProducer在執(zhí)行事務時不能被共享。所以,必須使用同一個TransactionMQProducer來發(fā)送所有類型的事務消息。當然同理也就必須使用一個偵聽器處理所有的消息了。

既然必須使用同一個TransactionMQProducer,對于比較大的應用,業(yè)務場景很多,就會造成混亂。這里我給出一個方案拋磚引玉。TransactionMQProducer在發(fā)送消息時,是可以傳遞參數對象和指定消息頭的。可以把要執(zhí)行的本地方法的bean名和方法名放進去。

//發(fā)送半事務消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
		topicAndTag,
		MessageBuilder.withPayload(msg)
			.setHeader(Constants.TX_ID_HEADER_NAME, msg.getTxId())
			.setHeader(Constants.CHECK_BEAN_ID_HEADER_NAME, def.getCheckBeanId())
			.setHeader(Constants.BIZ_ID_HEADER_NAME, msg.getBizId())
			.build(),
		def
);

其中def就是參數對象,可以自定義對象,這里是我自定義的TransactionMsgDefinationDto類,可以把想傳遞的信息放進去,最重要的是要執(zhí)行的本地方法的bean名和方法名和方法執(zhí)行參數:executeBeanId(bean名)、executeBeanMethod(方法名)、executeBeanParams(方法執(zhí)行參數)。該對象可以傳給RocketMQLocalTransactionListener的executeLocalTransaction方法,然后通過反射執(zhí)行。

@Override
	public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		try {
			//保存消息記錄
			String body = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
			JSONObject jsonBody = JSONObject.parseObject(body);
			BaseMsgDto dto = JSONObject.toJavaObject(jsonBody, BaseMsgDto.class);//(BaseMsgDto)msg.getPayload();
			TransactionMsgDefinationDto def = (TransactionMsgDefinationDto)arg;
			ProducerLog producerLog = BeanCopyUtils.copyProperties(def, ProducerLog::new);
			String[] tags = def.getMsgTags();
			if(tags !=null && tags.length > 0) {
				StringBuilder tag = new StringBuilder();
				for(int i = 0; i<tags.length; i++) {
					tag.append(tags[0]);
					if(i != tags.length-1) {
						tag.append("||");
					}
				}
				producerLog.setMsgTag(tag.toString());
			}
			producerLog.setBizId(dto.getBizId());
			producerLog.setTxId(dto.getTxId());
			producerLog.setBizType(dto.getBizType());
			producerLog.setGroupName(dto.getProducerGroup());
			producerLog.setMsgBody(body);
			producerLogService.save(producerLog);
			//執(zhí)行事務方法
			SpringUtil.invokeBeanMethod(def.getExecuteBeanId(), def.getExecuteBeanMethod(), def.getExecuteBeanParams());
			return RocketMQLocalTransactionState.COMMIT;
		} catch (Exception e) {
			logger.error("發(fā)生錯誤:", e);
			return RocketMQLocalTransactionState.UNKNOWN;
		}
	}

放在消息頭header中的數據可以傳遞給RocketMQLocalTransactionListener的checkLocalTransaction方法,然后同樣通過反射執(zhí)行。

@Override
	public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
		try {
			String txId = (String)msg.getHeaders().get(Constants.TX_ID_HEADER_NAME); 
			String checkBeanId = (String)msg.getHeaders().get(Constants.CHECK_BEAN_ID_HEADER_NAME);
			Long bizId = Long.parseLong((String)msg.getHeaders().get(Constants.BIZ_ID_HEADER_NAME));
			//執(zhí)行檢查方法
			Boolean ret = (Boolean)SpringUtil.invokeBeanMethod(checkBeanId, "check", new Object[]{bizId, txId});
			if(ret.booleanValue())
				return RocketMQLocalTransactionState.COMMIT;
			else
				return RocketMQLocalTransactionState.ROLLBACK;
		} catch (Exception e) {
			logger.error("發(fā)生錯誤:", e);
			return RocketMQLocalTransactionState.UNKNOWN;
		}
	}

到此這篇關于RocketMQ事務消息機制詳解的文章就介紹到這了,更多相關RocketMQ事務消息內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Java httpcomponents發(fā)送get post請求代碼實例

    Java httpcomponents發(fā)送get post請求代碼實例

    這篇文章主要介紹了Java httpcomponents發(fā)送get post請求代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-09-09
  • SpringBoot中的CSRF攻擊及預防方法

    SpringBoot中的CSRF攻擊及預防方法

    CSRF攻擊是一種常見的網絡攻擊方式,可以通過欺騙用戶來執(zhí)行惡意操作,在Spring Boot應用程序中,我們可以采取多種措施來預防CSRF攻擊,本文將給大家介紹一下CSRF攻擊以及如何預防攻擊,需要的朋友可以參考下
    2023-07-07
  • Maven多個項目實現聚合過程解析

    Maven多個項目實現聚合過程解析

    這篇文章主要介紹了Maven多個項目實現聚合過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-08-08
  • IDEA工程運行時總是報xx程序包不存在實際上包已導入(問題分析及解決方案)

    IDEA工程運行時總是報xx程序包不存在實際上包已導入(問題分析及解決方案)

    這篇文章主要介紹了IDEA工程運行時,總是報xx程序包不存在,實際上包已導入,本文給大家分享問題分析及解決方案,通過實例代碼給大家介紹的非常詳細,需要的朋友可以參考下
    2020-08-08
  • 一篇文章帶你了解Java SpringBoot Nacos

    一篇文章帶你了解Java SpringBoot Nacos

    這篇文章主要介紹了SpringBoot使用Nacos配置中心的實現,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2021-09-09
  • Java?SE使用for?each循環(huán)遍歷數組的方法代碼

    Java?SE使用for?each循環(huán)遍歷數組的方法代碼

    在Java?SE開發(fā)中,數組是最常見的數據結構之一,Java提供了多種遍歷數組的方式,其中for循環(huán)是最常用的方式之一,本文將介紹如何使用for?each循環(huán)遍歷數組,接下來,我們將通過一個簡單的代碼示例來展示如何使用for?each循環(huán)遍歷數組,需要的朋友可以參考下
    2023-11-11
  • java實現馬踏棋盤的完整版

    java實現馬踏棋盤的完整版

    這篇文章主要為大家詳細介紹了java實現馬踏棋盤的完整版,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-02-02
  • Java歸并排序算法代碼實現

    Java歸并排序算法代碼實現

    歸并(Merge)排序法是將兩個(或兩個以上)有序表合并成一個新的有序表,即把待排序序列分為若干個子序列,每個子序列是有序的,下面這篇文章主要給大家介紹了關于Java歸并排序算法的相關資料,需要的朋友可以參考下
    2024-03-03
  • 使用Java實現文件夾的遍歷操作指南

    使用Java實現文件夾的遍歷操作指南

    網上大多采用java遞歸的方式遍歷文件夾下的文件,這里我不太喜歡遞歸的風格,這篇文章主要給大家介紹了關于使用Java實現文件夾的遍歷操作的相關資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下
    2023-05-05
  • Java中判斷字符串是中文或者英文的工具類分享

    Java中判斷字符串是中文或者英文的工具類分享

    這篇文章主要介紹了Java中判斷字符串是中文或者英文的工具類分享,本文直接給出代碼,相關說明請看代碼的注釋,需要的朋友可以參考下
    2014-10-10

最新評論