欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java中的Rocketmq事務(wù)消息詳解

 更新時(shí)間:2023年08月14日 11:19:42   作者:澄風(fēng)  
這篇文章主要介紹了Java中的Rocketmq事務(wù)消息詳解,RocketMQ的事務(wù)消息支持在業(yè)務(wù)邏輯與發(fā)送消息之間提供事務(wù)保證,RocketMQ通過兩階段的方式提供事務(wù)消息的支持,需要的朋友可以參考下

什么是RocketMQ半消息(事務(wù)消息)

當(dāng)我們?cè)跇I(yè)務(wù)邏輯中發(fā)送消息時(shí),消息與業(yè)務(wù)的事務(wù)之間難以保證一致性,如果業(yè)務(wù)代碼出現(xiàn)異常,如果已發(fā)送的消息無法回滾,則很會(huì)出現(xiàn)數(shù)據(jù)不一致的情況,RocketMQ的事務(wù)消息支持在業(yè)務(wù)邏輯與發(fā)送消息之間提供事務(wù)保證,RocketMQ通過兩階段的方式提供事務(wù)消息的支持。

首需要注意的是 事務(wù)消息(半消息) 僅僅只是保證本地事務(wù)和MQ消息發(fā)送形成整體的 原子性 ,而投遞到MQ服務(wù)器后,并無法保證消費(fèi)者一定能消費(fèi)成功!

  1. 事務(wù)消息 :MQ 提供類似 X/Open XA 的分布事務(wù)功能,通過 MQ 事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。
  2. 半消息 :暫不能投遞的消息,發(fā)送方已經(jīng)將消息成功發(fā)送到了 MQ 服務(wù)端,但是服務(wù)端未收到生產(chǎn)者對(duì)該消息的二次確認(rèn),此時(shí)該消息被標(biāo)記成“暫不能投遞”狀態(tài),處于該種狀態(tài)下的消息即半消息。
  3. 半消息回查 :由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,MQ 服務(wù)端通過掃描發(fā)現(xiàn)某條消息長(zhǎng)期處于“半消息”時(shí),需要主動(dòng)向消息生產(chǎn)者詢問該消息的最終狀態(tài)(Commit 或是 Rollback),該過程即消息回查。
  4. 極端情況:是否任何情況下MQ的事務(wù)性消息都可以保證雙方的最終一致性?

答案是否定的。 考慮上面提到的異常情況“情況2:MQ發(fā)送方在步驟3執(zhí)行完本地事務(wù)之后commit之前異常退出”。在這種情況下如果如果MQ發(fā)送方由于運(yùn)維上的失誤長(zhǎng)時(shí)間不重啟MQ發(fā)送方,那么MQ在多次回查不成功之后將會(huì)丟棄該消息。最終分布式事務(wù)的雙方是不能達(dá)到最終一致性了。

當(dāng)然這個(gè)回查的最大值可以通過修改broker的參數(shù)transactionCheckMax來調(diào)整。但是過大的transactionCheckMax參數(shù)將會(huì)導(dǎo)致MQ堆積過多的半包消息,從而危害MQ的穩(wěn)定性,是個(gè)需要權(quán)衡的參數(shù)。

半消息事務(wù)實(shí)現(xiàn)流程

在這里插入圖片描述

流程:

1.發(fā)送方向 MQ 服務(wù)端發(fā)送事務(wù)消息;

2.MQ Server 將消息持久化成功之后,向發(fā)送方 ACK 確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息為半消息。

3.發(fā)送方開始執(zhí)行本地事務(wù)邏輯。

4.發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向 MQ Server 提交二次確認(rèn)(Commit 或是 Rollback),MQ Server 收到 Commit 狀態(tài)則將半消息標(biāo)記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態(tài)則刪除半消息,訂閱方將不會(huì)接受該消息。

5.在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá) MQ Server,經(jīng)過固定時(shí)間后 MQ Server 將對(duì)該消息發(fā)起消息回查。

6.發(fā)送方收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。

7.發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),MQ Server 仍按照步驟4對(duì)半消息進(jìn)行操作。

半消息的應(yīng)用場(chǎng)景是什么?

注冊(cè)系統(tǒng)注冊(cè)的流程中,用戶入口在網(wǎng)頁(yè)注冊(cè)系統(tǒng),通知系統(tǒng)在郵件系統(tǒng),兩個(gè)系統(tǒng)之間的數(shù)據(jù)需要保持最終一致。

普通消息處理

如上所述,注冊(cè)系統(tǒng)和郵件通知系統(tǒng)之間通過消息隊(duì)列進(jìn)行異步處理。注冊(cè)系統(tǒng)將注冊(cè)信息寫入注冊(cè)系統(tǒng)之后,發(fā)送一條注冊(cè)成功的消息到消息隊(duì)列RocketMQ版,郵件通知系統(tǒng)訂閱消息隊(duì)列RocketMQ版的注冊(cè)消息,做相應(yīng)的業(yè)務(wù)處理,發(fā)送注冊(cè)成功或者失敗的郵件。

在這里插入圖片描述

流程說明如下:

1.注冊(cè)系統(tǒng)發(fā)起注冊(cè)。

2.注冊(cè)系統(tǒng)向消息隊(duì)列RocketMQ版發(fā)送注冊(cè)消息成功與否的消息。

2.1 消息發(fā)送成功,進(jìn)入3。

2.2 消息發(fā)送失敗,導(dǎo)致郵件通知系統(tǒng)未收到消息隊(duì)列RocketMQ版發(fā)送的注冊(cè)成功與否的消息,而無法發(fā)送郵件,最終郵件通知系統(tǒng)和注冊(cè)系統(tǒng)之間的狀態(tài)數(shù)據(jù)不一致,流程結(jié)束。

3.郵件通知系統(tǒng)收到消息隊(duì)列RocketMQ版的注冊(cè)成功消息。

4.郵件通知系統(tǒng)發(fā)送注冊(cè)成功郵件給用戶。

在這樣的情況下,雖然實(shí)現(xiàn)了系統(tǒng)間的解耦,上游系統(tǒng)不需要關(guān)心下游系統(tǒng)的業(yè)務(wù)處理結(jié)果;

但是數(shù)據(jù)一致性不好處理,如何保證郵件通知系統(tǒng)狀態(tài)與注冊(cè)系統(tǒng)狀態(tài)的最終一致。

事務(wù)消息處理

此時(shí),需要利用消息隊(duì)列RocketMQ版所提供的事務(wù)消息來實(shí)現(xiàn)系統(tǒng)間的狀態(tài)數(shù)據(jù)一致性。

在這里插入圖片描述

事務(wù)消息 流程說明如下:

1.注冊(cè)系統(tǒng)向消息隊(duì)列RocketMQ版發(fā)送半事務(wù)消息。

1.1 半事務(wù)消息發(fā)送成功,進(jìn)入2。

1.2 半事務(wù)消息發(fā)送失敗,注冊(cè)系統(tǒng)不進(jìn)行注冊(cè),流程結(jié)束。

  • 說明 最終注冊(cè)系統(tǒng)與郵件通知系統(tǒng)數(shù)據(jù)一致。

2.注冊(cè)系統(tǒng)開始注冊(cè)。

2.1 注冊(cè)成功,進(jìn)入3.1。

2.2 注冊(cè)失敗,進(jìn)入3.2。

3.注冊(cè)系統(tǒng)向消息隊(duì)列RocketMQ版發(fā)送半消息狀態(tài)。

3.1 提交半事務(wù)消息,產(chǎn)生注冊(cè)成功消息,進(jìn)入4。

3.2 回滾半事務(wù)消息,未產(chǎn)生注冊(cè)成功消息,流程結(jié)束。

  • 說明 最終注冊(cè)系統(tǒng)與郵件通知系統(tǒng)數(shù)據(jù)一致。

4.郵件通知系統(tǒng)接收消息隊(duì)列RocketMQ版的注冊(cè)成功消息。

5.郵件通知系統(tǒng)發(fā)送注冊(cè)成功郵件。

  • 說明 最終注冊(cè)系統(tǒng)與郵件通知系統(tǒng)數(shù)據(jù)一致。

關(guān)于分布式事務(wù)消息的更多詳細(xì)內(nèi)容,請(qǐng)參見事務(wù)消息。

這一段是摘抄子阿里云的rocketmq文檔介紹,大概的意思就是我發(fā)消息的動(dòng)作要和一個(gè)本地事務(wù)進(jìn)行綁定,我如果發(fā)消息失敗那么你本地事務(wù)也不應(yīng)該執(zhí)行,我本地事務(wù)執(zhí)行失敗,那么消息也不應(yīng)該發(fā)。要保證上下游系統(tǒng)的數(shù)據(jù)是最終一致的,保證消息和本地事務(wù)一定是原子性的。

實(shí)踐

在這里插入圖片描述

模擬一個(gè)接口發(fā)送事務(wù)消息

@GetMapping("/sendMessage")
public String sendMessage(String cron) {
    //TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction
    //        (String.format("%s:%s", "FINORDER_MQ_TOPIC_KEY", "FINORDER_PAY_TAG_KEY"), MessageBuilder.withPayload("{\"name\": \"ok\"}").build(), "");
    Message<String> mqMessage = MessageBuilder
            .withPayload("OK 啦!")
            .setHeader("key", "ALLENS") // ①
            .build();
    TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("FINORDER_MQ_TOPIC_KEY:FINORDER_PAY_TAG_KEY" /* ② */, mqMessage , "");
    return "ok";
}

① 設(shè)置消息頭,這事務(wù)監(jiān)聽里邊可以通過這個(gè)header來區(qū)分是哪一個(gè)事務(wù),假如單個(gè)微服務(wù)有多個(gè)事務(wù)消息就可以用這個(gè)來區(qū)分。

② TOPIC + groupid 用":"來分割

創(chuàng)建事務(wù)監(jiān)聽器

@Service
@RocketMQTransactionListener
@Slf4j
public class TestTransactionListenerImpl implements RocketMQLocalTransactionListener {
	/**
	 * 每次推送消息會(huì)執(zhí)行executeLocalTransaction方法,首先會(huì)發(fā)送半消息,到這里的時(shí)候是執(zhí)行具體本地業(yè)務(wù),
     * 執(zhí)行成功后手動(dòng)返回RocketMQLocalTransactionState.COMMIT狀態(tài),
     * 這里是保證本地事務(wù)執(zhí)行成功,如果本地事務(wù)執(zhí)行失敗則可以返回ROLLBACK進(jìn)行消息回滾。 此時(shí)消息只是被保存到broker,并沒有發(fā)送到topic中,broker會(huì)根據(jù)本地返回的狀態(tài)來決定消息的處理方式。
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("接收到消息:" + msg);
        System.out.println("Header:" + msg.getHeaders().get("key"));
        return RocketMQLocalTransactionState.COMMIT;
    }
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        return RocketMQLocalTransactionState.COMMIT;
    }
}

該監(jiān)聽器的實(shí)現(xiàn)有兩個(gè)方法一個(gè)是本地事務(wù)的執(zhí)行方法executeLocalTransaction,一個(gè)是本地事務(wù)回查方法checkLocalTransaction。 兩個(gè)方法的返回值類型為RocketMQLocalTransactionState,該枚舉有三種:

// COMMIT:即生產(chǎn)者通知Rocket該消息可以消費(fèi)
RocketMQLocalTransactionState.COMMIT;
// ROLLBACK:即生產(chǎn)者通知Rocket將該消息刪除
RocketMQLocalTransactionState.ROLLBACK;
// UNKNOWN:即生產(chǎn)者通知Rocket繼續(xù)查詢?cè)撓⒌臓顟B(tài)
RocketMQLocalTransactionState.UNKNOWN;

對(duì)于長(zhǎng)時(shí)間沒有 Commit/Rollback 的事務(wù)消息( pending 狀態(tài)的消息),從服務(wù)端發(fā)起一次 回查Producer 收到回查消息,檢查回查消息對(duì)應(yīng)的 本地事務(wù)狀態(tài)根據(jù)本地事務(wù)狀態(tài),重新 Commit 或者 Rollback。

以上代碼中,如果sex是偶數(shù),executeLocalTransaction會(huì)拋出異常,本地事務(wù)會(huì)回滾,半消息狀態(tài)是UNKNOWN,此時(shí)就會(huì)啟動(dòng)消息的回查機(jī)制,mq會(huì)在一定的時(shí)間調(diào)用checkLocalTransaction方法查詢執(zhí)行狀態(tài),根據(jù)執(zhí)行狀態(tài)來決定是繼續(xù)回查、刪除消息、發(fā)送消息。

executeLocalTransaction也可以自己捕獲異常,手動(dòng)回滾事務(wù),返回RocketMQLocalTransactionState.ROLLBACK,這樣能減少消息回查。

等消息正常提交,半消息消息會(huì)移動(dòng)到發(fā)送指定的TOPIC隊(duì)里中,這個(gè)時(shí)候訂閱者就可以正常獲取消息了。

@Service
@RocketMQMessageListener(consumerGroup = "FINORDER_PAY_TAG_KEY",topic = "FINORDER_MQ_TOPIC_KEY")
@Slf4j
public class MQConsumerService implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("consumer:" + message);
    }
}

總結(jié)

1、首需要注意的是 事務(wù)消息(半消息) 僅僅只是保證本地事務(wù)和MQ消息發(fā)送形成整體的 原子性 ,而投遞到MQ服務(wù)器后,并無法保證消費(fèi)者一定能消費(fèi)成功!

2、發(fā)消息的動(dòng)作要和一個(gè)本地事務(wù)進(jìn)行綁定,我如果發(fā)消息失敗那么你本地事務(wù)也不應(yīng)該執(zhí)行,我本地事務(wù)執(zhí)行失敗,那么消息也不應(yīng)該發(fā)。要保證上下游系統(tǒng)的數(shù)據(jù)是最終一致的,保證消息和本地事務(wù)一定是原子性的。

3、 半消息如果提交成功最終是要入隊(duì)列的,可以正常的收到消息,這個(gè)時(shí)候可以認(rèn)為上游系統(tǒng)的依賴條件肯定是已經(jīng)執(zhí)行成功了的。

到此這篇關(guān)于Java中的Rocketmq事務(wù)消息詳解的文章就介紹到這了,更多相關(guān)Rocketmq事務(wù)消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論