欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ事務(wù)消息機(jī)制詳解

 更新時(shí)間:2024年01月11日 09:28:50   作者:智由靜生  
這篇文章主要介紹了RocketMQ事務(wù)消息機(jī)制詳解,RocketMQ服務(wù)端將消息持久化之后,向發(fā)送方返回Ack確認(rèn)消息已經(jīng)發(fā)送成功,由于消息為半事務(wù)消息,在未收到生產(chǎn)者對(duì)該消息的二次確認(rèn)前,此消息被標(biāo)記成"暫不能投遞"狀態(tài),需要的朋友可以參考下

RocketMQ事務(wù)消息

RocketMQ提供了事務(wù)消息,通過事務(wù)消息就能達(dá)到分布式事務(wù)的最終一致,從而實(shí)現(xiàn)了可靠消息服務(wù)。

一、事務(wù)消息的實(shí)現(xiàn)步驟

事務(wù)消息發(fā)送步驟:

1. 發(fā)送方將半事務(wù)消息發(fā)送至RocketMQ服務(wù)端。

2. RocketMQ服務(wù)端將消息持久化之后,向發(fā)送方返回Ack確認(rèn)消息已經(jīng)發(fā)送成功。由于消息為半事務(wù)消息,在未收到生產(chǎn)者對(duì)該消息的二次確認(rèn)前,此消息被標(biāo)記成“暫不能投遞”狀態(tài)。

3. 發(fā)送方開始執(zhí)行本地事務(wù)邏輯。

4. 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)(Commit 或是 Rollback),服務(wù)端收到Commit 狀態(tài)則將半事務(wù)消息標(biāo)記為可投遞,訂閱方最終將收到該消息;服務(wù)端收到 Rollback 狀態(tài)則刪除半事務(wù)消息,訂閱方將不會(huì)接受該消息。

事務(wù)消息回查步驟:

1. 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá)服務(wù)端,經(jīng)過固定時(shí)間后服務(wù)端將對(duì)該消息發(fā)起消息回查。

2. 發(fā)送方收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。 3. 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對(duì)半事務(wù)消息進(jìn)行操作。

二、程序?qū)崿F(xiàn)

事務(wù)消息處理類需要繼承RocketMQLocalTransactionListener類。該類的executeLocalTransaction方法負(fù)責(zé)在接到RocketMQ服務(wù)端的Ack確認(rèn)消息后執(zhí)行本地方法,也就是事務(wù)消息發(fā)送步驟中的步驟3。該類的checkLocalTransaction方法負(fù)責(zé),在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,執(zhí)行RocketMQ服務(wù)端的消息回查,也就是事務(wù)消息回查步驟中的步驟2。

此外,要使該類生效,還需要加@RocketMQTransactionListener注解。這里有個(gè)要特別注意的地方。在2.1.0版本前,這個(gè)注解有一個(gè)屬性txProducerGroup,可以用多個(gè)@RocketMQTransactionListener來監(jiān)聽不同的txProducerGroup來發(fā)送不同類型的事務(wù)消息到topic。但是現(xiàn)在在一個(gè)項(xiàng)目中,如果你在一個(gè)project中寫了多個(gè)@RocketMQTransactionListener,項(xiàng)目將不能啟動(dòng),啟動(dòng)會(huì)報(bào)錯(cuò)。產(chǎn)生這個(gè)問題的原因據(jù)說是,當(dāng)使用RocketMQTemplate并發(fā)的執(zhí)行事務(wù)時(shí),非常容易出現(xiàn)"illegal state"的異常,原因是一個(gè)TransactionProducer在執(zhí)行事務(wù)時(shí)不能被共享。所以,必須使用同一個(gè)TransactionMQProducer來發(fā)送所有類型的事務(wù)消息。當(dāng)然同理也就必須使用一個(gè)偵聽器處理所有的消息了。

既然必須使用同一個(gè)TransactionMQProducer,對(duì)于比較大的應(yīng)用,業(yè)務(wù)場(chǎng)景很多,就會(huì)造成混亂。這里我給出一個(gè)方案拋磚引玉。TransactionMQProducer在發(fā)送消息時(shí),是可以傳遞參數(shù)對(duì)象和指定消息頭的??梢园岩獔?zhí)行的本地方法的bean名和方法名放進(jìn)去。

//發(fā)送半事務(wù)消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
		topicAndTag,
		MessageBuilder.withPayload(msg)
			.setHeader(Constants.TX_ID_HEADER_NAME, msg.getTxId())
			.setHeader(Constants.CHECK_BEAN_ID_HEADER_NAME, def.getCheckBeanId())
			.setHeader(Constants.BIZ_ID_HEADER_NAME, msg.getBizId())
			.build(),
		def
);

其中def就是參數(shù)對(duì)象,可以自定義對(duì)象,這里是我自定義的TransactionMsgDefinationDto類,可以把想傳遞的信息放進(jìn)去,最重要的是要執(zhí)行的本地方法的bean名和方法名和方法執(zhí)行參數(shù):executeBeanId(bean名)、executeBeanMethod(方法名)、executeBeanParams(方法執(zhí)行參數(shù))。該對(duì)象可以傳給RocketMQLocalTransactionListener的executeLocalTransaction方法,然后通過反射執(zhí)行。

@Override
	public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		try {
			//保存消息記錄
			String body = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
			JSONObject jsonBody = JSONObject.parseObject(body);
			BaseMsgDto dto = JSONObject.toJavaObject(jsonBody, BaseMsgDto.class);//(BaseMsgDto)msg.getPayload();
			TransactionMsgDefinationDto def = (TransactionMsgDefinationDto)arg;
			ProducerLog producerLog = BeanCopyUtils.copyProperties(def, ProducerLog::new);
			String[] tags = def.getMsgTags();
			if(tags !=null && tags.length > 0) {
				StringBuilder tag = new StringBuilder();
				for(int i = 0; i<tags.length; i++) {
					tag.append(tags[0]);
					if(i != tags.length-1) {
						tag.append("||");
					}
				}
				producerLog.setMsgTag(tag.toString());
			}
			producerLog.setBizId(dto.getBizId());
			producerLog.setTxId(dto.getTxId());
			producerLog.setBizType(dto.getBizType());
			producerLog.setGroupName(dto.getProducerGroup());
			producerLog.setMsgBody(body);
			producerLogService.save(producerLog);
			//執(zhí)行事務(wù)方法
			SpringUtil.invokeBeanMethod(def.getExecuteBeanId(), def.getExecuteBeanMethod(), def.getExecuteBeanParams());
			return RocketMQLocalTransactionState.COMMIT;
		} catch (Exception e) {
			logger.error("發(fā)生錯(cuò)誤:", e);
			return RocketMQLocalTransactionState.UNKNOWN;
		}
	}

放在消息頭header中的數(shù)據(jù)可以傳遞給RocketMQLocalTransactionListener的checkLocalTransaction方法,然后同樣通過反射執(zhí)行。

@Override
	public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
		try {
			String txId = (String)msg.getHeaders().get(Constants.TX_ID_HEADER_NAME); 
			String checkBeanId = (String)msg.getHeaders().get(Constants.CHECK_BEAN_ID_HEADER_NAME);
			Long bizId = Long.parseLong((String)msg.getHeaders().get(Constants.BIZ_ID_HEADER_NAME));
			//執(zhí)行檢查方法
			Boolean ret = (Boolean)SpringUtil.invokeBeanMethod(checkBeanId, "check", new Object[]{bizId, txId});
			if(ret.booleanValue())
				return RocketMQLocalTransactionState.COMMIT;
			else
				return RocketMQLocalTransactionState.ROLLBACK;
		} catch (Exception e) {
			logger.error("發(fā)生錯(cuò)誤:", e);
			return RocketMQLocalTransactionState.UNKNOWN;
		}
	}

到此這篇關(guān)于RocketMQ事務(wù)消息機(jī)制詳解的文章就介紹到這了,更多相關(guān)RocketMQ事務(wù)消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java httpcomponents發(fā)送get post請(qǐng)求代碼實(shí)例

    Java httpcomponents發(fā)送get post請(qǐng)求代碼實(shí)例

    這篇文章主要介紹了Java httpcomponents發(fā)送get post請(qǐng)求代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-09-09
  • SpringBoot中的CSRF攻擊及預(yù)防方法

    SpringBoot中的CSRF攻擊及預(yù)防方法

    CSRF攻擊是一種常見的網(wǎng)絡(luò)攻擊方式,可以通過欺騙用戶來執(zhí)行惡意操作,在Spring Boot應(yīng)用程序中,我們可以采取多種措施來預(yù)防CSRF攻擊,本文將給大家介紹一下CSRF攻擊以及如何預(yù)防攻擊,需要的朋友可以參考下
    2023-07-07
  • Maven多個(gè)項(xiàng)目實(shí)現(xiàn)聚合過程解析

    Maven多個(gè)項(xiàng)目實(shí)現(xiàn)聚合過程解析

    這篇文章主要介紹了Maven多個(gè)項(xiàng)目實(shí)現(xiàn)聚合過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-08-08
  • IDEA工程運(yùn)行時(shí)總是報(bào)xx程序包不存在實(shí)際上包已導(dǎo)入(問題分析及解決方案)

    IDEA工程運(yùn)行時(shí)總是報(bào)xx程序包不存在實(shí)際上包已導(dǎo)入(問題分析及解決方案)

    這篇文章主要介紹了IDEA工程運(yùn)行時(shí),總是報(bào)xx程序包不存在,實(shí)際上包已導(dǎo)入,本文給大家分享問題分析及解決方案,通過實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2020-08-08
  • 一篇文章帶你了解Java SpringBoot Nacos

    一篇文章帶你了解Java SpringBoot Nacos

    這篇文章主要介紹了SpringBoot使用Nacos配置中心的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-09-09
  • Java?SE使用for?each循環(huán)遍歷數(shù)組的方法代碼

    Java?SE使用for?each循環(huán)遍歷數(shù)組的方法代碼

    在Java?SE開發(fā)中,數(shù)組是最常見的數(shù)據(jù)結(jié)構(gòu)之一,Java提供了多種遍歷數(shù)組的方式,其中for循環(huán)是最常用的方式之一,本文將介紹如何使用for?each循環(huán)遍歷數(shù)組,接下來,我們將通過一個(gè)簡(jiǎn)單的代碼示例來展示如何使用for?each循環(huán)遍歷數(shù)組,需要的朋友可以參考下
    2023-11-11
  • java實(shí)現(xiàn)馬踏棋盤的完整版

    java實(shí)現(xiàn)馬踏棋盤的完整版

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)馬踏棋盤的完整版,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-02-02
  • Java歸并排序算法代碼實(shí)現(xiàn)

    Java歸并排序算法代碼實(shí)現(xiàn)

    歸并(Merge)排序法是將兩個(gè)(或兩個(gè)以上)有序表合并成一個(gè)新的有序表,即把待排序序列分為若干個(gè)子序列,每個(gè)子序列是有序的,下面這篇文章主要給大家介紹了關(guān)于Java歸并排序算法的相關(guān)資料,需要的朋友可以參考下
    2024-03-03
  • 使用Java實(shí)現(xiàn)文件夾的遍歷操作指南

    使用Java實(shí)現(xiàn)文件夾的遍歷操作指南

    網(wǎng)上大多采用java遞歸的方式遍歷文件夾下的文件,這里我不太喜歡遞歸的風(fēng)格,這篇文章主要給大家介紹了關(guān)于使用Java實(shí)現(xiàn)文件夾的遍歷操作的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-05-05
  • Java中判斷字符串是中文或者英文的工具類分享

    Java中判斷字符串是中文或者英文的工具類分享

    這篇文章主要介紹了Java中判斷字符串是中文或者英文的工具類分享,本文直接給出代碼,相關(guān)說明請(qǐng)看代碼的注釋,需要的朋友可以參考下
    2014-10-10

最新評(píng)論