欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ發(fā)送事務(wù)消息詳解

 更新時(shí)間:2023年09月08日 09:35:28   作者:morris131  
這篇文章主要介紹了RocketMQ發(fā)送事務(wù)消息詳解,RocketMQ分布式事務(wù)消息不僅可以實(shí)現(xiàn)應(yīng)用之間的解耦,又能保證數(shù)據(jù)的最終一致性,傳統(tǒng)的大事務(wù)可以被拆分為小事務(wù),不僅能提升效率,還不會(huì)因?yàn)槟骋粋€(gè)關(guān)聯(lián)應(yīng)用的不可用導(dǎo)致整體回滾,需要的朋友可以參考下

概念介紹

  • 事務(wù)消息:提供類(lèi)似XA或Open XA的分布式事務(wù)功能,通過(guò)事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。
  • 半事務(wù)消息:暫不能投遞的消息,生產(chǎn)者已經(jīng)成功地將消息發(fā)送到了RocketMQ服務(wù)端,但是RocketMQ服務(wù)端未收到生產(chǎn)者對(duì)該消息的二次確認(rèn),此時(shí)該消息被標(biāo)記成“暫不能投遞”狀態(tài),處于該種狀態(tài)下的消息即半事務(wù)消息。
  • 消息回查:由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,RocketMQ服務(wù)端通過(guò)掃描發(fā)現(xiàn)某條消息長(zhǎng)期處于“半事務(wù)消息”時(shí),需要主動(dòng)向消息生產(chǎn)者詢(xún)問(wèn)該消息的最終狀態(tài)(Commit或是Rollback),該詢(xún)問(wèn)過(guò)程即消息回查。

分布式事務(wù)消息的優(yōu)勢(shì)

RocketMQ分布式事務(wù)消息不僅可以實(shí)現(xiàn)應(yīng)用之間的解耦,又能保證數(shù)據(jù)的最終一致性。

同時(shí),傳統(tǒng)的大事務(wù)可以被拆分為小事務(wù),不僅能提升效率,還不會(huì)因?yàn)槟骋粋€(gè)關(guān)聯(lián)應(yīng)用的不可用導(dǎo)致整體回滾,從而最大限度保證核心系統(tǒng)的可用性。

在極端情況下,如果關(guān)聯(lián)的某一個(gè)應(yīng)用始終無(wú)法處理成功,也只需對(duì)當(dāng)前應(yīng)用進(jìn)行補(bǔ)償或數(shù)據(jù)訂正處理,而無(wú)需對(duì)整體業(yè)務(wù)進(jìn)行回滾。

典型場(chǎng)景

在淘寶購(gòu)物車(chē)下單時(shí),涉及到購(gòu)物車(chē)系統(tǒng)和交易系統(tǒng),這兩個(gè)系統(tǒng)之間的數(shù)據(jù)最終一致性可以通過(guò)分布式事務(wù)消息的異步處理實(shí)現(xiàn)。

在這種場(chǎng)景下,交易系統(tǒng)是最為核心的系統(tǒng),需要最大限度地保證下單成功。

而購(gòu)物車(chē)系統(tǒng)只需要訂閱交易訂單消息,做相應(yīng)的業(yè)務(wù)處理,即可保證最終的數(shù)據(jù)一致性。

交互流程

事務(wù)消息交互流程如下圖所示。

1656920164647.png

事務(wù)消息發(fā)送步驟如下:

  1. 生產(chǎn)者將半事務(wù)消息發(fā)送至RocketMQ服務(wù)端。
  2. RocketMQ服務(wù)端將消息持久化成功之后,向生產(chǎn)者返回Ack確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息為半事務(wù)消息。
  3. 生產(chǎn)者開(kāi)始執(zhí)行本地事務(wù)邏輯。
  4. 生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)結(jié)果(Commit或是Rollback),服務(wù)端收到確認(rèn)結(jié)果后處理邏輯如下:
    • 二次確認(rèn)結(jié)果為Commit:服務(wù)端將半事務(wù)消息標(biāo)記為可投遞,并投遞給消費(fèi)者。
    • 二次確認(rèn)結(jié)果為Rollback:服務(wù)端將回滾事務(wù),不會(huì)將半事務(wù)消息投遞給消費(fèi)者。
  5. 在斷網(wǎng)或者是生產(chǎn)者應(yīng)用重啟的特殊情況下,若服務(wù)端未收到發(fā)送者提交的二次確認(rèn)結(jié)果,或服務(wù)端收到的二次確認(rèn)結(jié)果為Unknown未知狀態(tài),經(jīng)過(guò)固定時(shí)間后,服務(wù)端將對(duì)消息生產(chǎn)者即生產(chǎn)者集群中任一生產(chǎn)者實(shí)例發(fā)起消息回查。

事務(wù)消息回查步驟如下:

  1. 生產(chǎn)者收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
  2. 生產(chǎn)者根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對(duì)半事務(wù)消息進(jìn)行處理。

示例代碼

事務(wù)消息生產(chǎn)者

public enum LocalTransactionState {
    COMMIT_MESSAGE,
    ROLLBACK_MESSAGE,
    UNKNOW,
}

事務(wù)消息發(fā)送完成本地事務(wù)后,可在execute方法中返回以下三種狀態(tài):

  • COMMIT_MESSAGE:提交事務(wù),允許消費(fèi)者消費(fèi)該消息。
  • ROLLBACK_MESSAGE:回滾事務(wù),消息將被丟棄不允許消費(fèi)。
  • UNKNOW:暫時(shí)無(wú)法判斷狀態(tài),等待固定時(shí)間以后消息隊(duì)列RocketMQ版服務(wù)端根據(jù)回查規(guī)則向生產(chǎn)者進(jìn)行消息回查。

創(chuàng)建事務(wù)消息的Producer時(shí)必須指定TransactionListener的實(shí)現(xiàn)類(lèi),處理異常情況下事務(wù)消息的回查。

回查規(guī)則:本地事務(wù)執(zhí)行完成后,若服務(wù)端收到的本地事務(wù)返回狀態(tài)為T(mén)ransactionStatus.Unknow,或生產(chǎn)者應(yīng)用退出導(dǎo)致本地事務(wù)未提交任何狀態(tài)。則服務(wù)端會(huì)向消息生產(chǎn)者發(fā)起事務(wù)回查,第一次回查后仍未獲取到事務(wù)狀態(tài),則之后每隔一段時(shí)間會(huì)再次回查。

回查間隔時(shí)間:系統(tǒng)默認(rèn)每隔30秒發(fā)起一次定時(shí)任務(wù),對(duì)未提交的半事務(wù)消息進(jìn)行回查,共持續(xù)12小時(shí)。

package com.morris.rocketmq.transaction;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;
/**
 * 事務(wù)消息生產(chǎn)者
 */
public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("transaction-producer-demo");
        producer.setNamesrvAddr(NAME_SERVER_ADDRESS);
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        });
        producer.setExecutorService(executorService);
        // 指定事務(wù)會(huì)查的實(shí)現(xiàn)類(lèi)
        producer.setTransactionListener(new TransactionListener() {
            private final AtomicInteger transactionIndex = new AtomicInteger(0);
            private final ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                int value = transactionIndex.getAndIncrement();
                System.out.println(Thread.currentThread().getName()+  "-executeLocalTransaction:" + new String(msg.getBody()) + ",value=" + value);
                int status = value % 3;
                localTrans.put(msg.getTransactionId(), status);
                return LocalTransactionState.UNKNOW;
            }
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println(Thread.currentThread().getName()+  "-checkLocalTransaction:" + new String(msg.getBody()));
                Integer status = localTrans.get(msg.getTransactionId());
                if (null != status) {
                    switch (status) {
                        case 0:
                            return LocalTransactionState.COMMIT_MESSAGE;
                        case 1:
                            return LocalTransactionState.UNKNOW;
                        case 2:
                            return LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                }
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        producer.start();
        for(int i = 0; i < 10; i++) {
            Message message = new Message("TransactionTopic", ("transactionDemo" + i).getBytes());
            // 發(fā)送事務(wù)消息
            producer.sendMessageInTransaction(message, i);
            System.out.println(message);
        }
    }
}

第一次消息回查最快時(shí)間:該參數(shù)支持自定義設(shè)置。若指定消息未達(dá)到設(shè)置的最快回查時(shí)間前,系統(tǒng)默認(rèn)每隔30秒一次的回查任務(wù)不會(huì)檢查該消息。

設(shè)置方式如下:

Message message = new Message();
message.putUserProperties(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "60");

說(shuō)明:因?yàn)橄到y(tǒng)默認(rèn)的回查間隔,第一次消息回查的實(shí)際時(shí)間會(huì)向后有0秒~30秒的浮動(dòng)。

例如:指定消息的第一次消息最快回查時(shí)間設(shè)置為60秒,系統(tǒng)在第58秒時(shí)達(dá)到定時(shí)的回查時(shí)間,但設(shè)置的60秒未到,所以該消息不在本次回查范圍內(nèi)。等待間隔30秒后,下一次的系統(tǒng)回查時(shí)間在第88秒,該消息才符合條件進(jìn)行第一次回查,距設(shè)置的最快回查時(shí)間延后了28秒。

事務(wù)消息消費(fèi)者

事務(wù)消息的Group ID不能與其他類(lèi)型消息的Group ID共用。與其他類(lèi)型的消息不同,事務(wù)消息有回查機(jī)制,回查時(shí)服務(wù)端會(huì)根據(jù)Group ID去查詢(xún)生產(chǎn)者客戶(hù)端。

package com.morris.rocketmq.transaction;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;
/**
 * 事務(wù)消息消費(fèi)者
 */
public class TranscationConsumer {
    public static void main(String[] args) throws Exception {
        // 實(shí)例化消息生產(chǎn)者,指定組名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction-consumer-group");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);
        // 訂閱Topic
        consumer.subscribe("TransactionTopic", "*");
        //負(fù)載均衡模式消費(fèi)
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注冊(cè)回調(diào)函數(shù),處理消息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            try {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        //啟動(dòng)消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

使用說(shuō)明

  1. 事務(wù)消息不支持延時(shí)消息和批量消息。
  2. 事務(wù)回查的間隔時(shí)間:BrokerConfig.transactionCheckInterval,通過(guò)Broker的配置文件設(shè)置好。
  3. 為了避免單個(gè)消息被檢查太多次而導(dǎo)致半隊(duì)列消息累積,我們默認(rèn)將單個(gè)消息的檢查次數(shù)限制為15次,但是用戶(hù)可以通過(guò)Broker配置文件的transactionCheckMax參數(shù)來(lái)修改此限制。如果已經(jīng)檢查某條消息超過(guò)N次的話(N=transactionCheckMax)則Broker將丟棄此消息,并在默認(rèn)情況下同時(shí)打印錯(cuò)誤日志。用戶(hù)可以通過(guò)重寫(xiě)AbstractTransactionCheckListener類(lèi)來(lái)修改這個(gè)行為。
  4. 事務(wù)消息將在Broker配置文件中的參數(shù)transactionMsgTimeout這樣的特定時(shí)間長(zhǎng)度之后被檢查。當(dāng)發(fā)送事務(wù)消息時(shí),用戶(hù)還可以通過(guò)設(shè)置用戶(hù)屬性 CHECK_IMMUNITY_TIME_IN_SECONDS來(lái)改變這個(gè)限制,該參數(shù)優(yōu)先于transactionMsgTimeout參數(shù)。
  5. 事務(wù)性消息可能不止一次被檢查或消費(fèi)。
  6. 事務(wù)性消息中用到了生產(chǎn)者群組,這種就是一種高可用機(jī)制,用來(lái)確保事務(wù)消息的可靠性。
  7. 提交給用戶(hù)的目標(biāo)主題消息可能會(huì)失敗,目前這依日志的記錄而定。它的高可用性通過(guò)RocketMQ本身的高可用性機(jī)制來(lái)保證,如果希望確保事務(wù)消息不丟失、并且事務(wù)完整性得到保證,建議使用同步的雙重寫(xiě)入機(jī)制。
  8. 事務(wù)消息的生產(chǎn)者ID不能與其他類(lèi)型消息的生產(chǎn)者ID共享。與其他類(lèi)型的消息不同,事務(wù)消息允許反向查詢(xún)、MQ服務(wù)器能通過(guò)它們的生產(chǎn)者ID查詢(xún)到消費(fèi)者。

到此這篇關(guān)于RocketMQ發(fā)送事務(wù)消息詳解的文章就介紹到這了,更多相關(guān)RocketMQ事務(wù)消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 淺談java多線程 join方法以及優(yōu)先級(jí)方法

    淺談java多線程 join方法以及優(yōu)先級(jí)方法

    下面小編就為大家?guī)?lái)一篇淺談java多線程 join方法以及優(yōu)先級(jí)方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-01-01
  • Java獲取HttpServletRequest的三種方法詳解

    Java獲取HttpServletRequest的三種方法詳解

    這篇文章主要介紹了Java獲取HttpServletRequest的三種方法詳解,是一個(gè)接口,全限定名稱(chēng)為Jakarta.Serclet.http.HttpServletRequest
    HttpServletRequest接口是Servlet規(guī)范的一員,需要的朋友可以參考下
    2023-11-11
  • Spring切入點(diǎn)表達(dá)式配置過(guò)程圖解

    Spring切入點(diǎn)表達(dá)式配置過(guò)程圖解

    這篇文章主要介紹了Spring切入點(diǎn)表達(dá)式配置過(guò)程圖解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-08-08
  • SpringBoot使用AOP與注解實(shí)現(xiàn)請(qǐng)求參數(shù)自動(dòng)填充流程詳解

    SpringBoot使用AOP與注解實(shí)現(xiàn)請(qǐng)求參數(shù)自動(dòng)填充流程詳解

    面向切面編程(aspect-oriented programming,AOP)主要實(shí)現(xiàn)的目的是針對(duì)業(yè)務(wù)處理過(guò)程中的切面進(jìn)行提取,諸如日志、事務(wù)管理和安全這樣的系統(tǒng)服務(wù),從而使得業(yè)務(wù)邏輯各部分之間的耦合度降低,提高程序的可重用性,同時(shí)提高了開(kāi)發(fā)的效率
    2023-02-02
  • Java選擇結(jié)構(gòu)與循環(huán)結(jié)構(gòu)的使用詳解

    Java選擇結(jié)構(gòu)與循環(huán)結(jié)構(gòu)的使用詳解

    循環(huán)結(jié)構(gòu)是指在程序中需要反復(fù)執(zhí)行某個(gè)功能而設(shè)置的一種程序結(jié)構(gòu)。它由循環(huán)體中的條件,判斷繼續(xù)執(zhí)行某個(gè)功能還是退出循環(huán),選擇結(jié)構(gòu)用于判斷給定的條件,根據(jù)判斷的結(jié)果判斷某些條件,根據(jù)判斷的結(jié)果來(lái)控制程序的流程
    2022-03-03
  • Java ExecutorServic線程池異步實(shí)現(xiàn)流程

    Java ExecutorServic線程池異步實(shí)現(xiàn)流程

    這篇文章主要介紹了Java ExecutorServic線程池異步實(shí)現(xiàn)流程,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-12-12
  • Feign如何解決服務(wù)之間調(diào)用傳遞token

    Feign如何解決服務(wù)之間調(diào)用傳遞token

    這篇文章主要介紹了Feign如何解決服務(wù)之間調(diào)用傳遞token,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • MyBatis框架之mybatis逆向工程自動(dòng)生成代碼

    MyBatis框架之mybatis逆向工程自動(dòng)生成代碼

    Mybatis屬于半自動(dòng)ORM,在使用這個(gè)框架中,工作量最大的就是書(shū)寫(xiě)Mapping的映射文件,由于手動(dòng)書(shū)寫(xiě)很容易出錯(cuò),我們可以利用Mybatis-Generator來(lái)幫我們自動(dòng)生成文件。本文主要給大家介紹mybatis逆向工程自動(dòng)生成代碼,感興趣的朋友一起學(xué)習(xí)吧
    2016-04-04
  • Java8 使用流抽取List<T>集合中T的某個(gè)屬性操作

    Java8 使用流抽取List<T>集合中T的某個(gè)屬性操作

    這篇文章主要介紹了Java8 使用流抽取List<T>集合中T的某個(gè)屬性操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-02-02
  • 關(guān)于在Java中如何使用yaml的實(shí)例

    關(guān)于在Java中如何使用yaml的實(shí)例

    這篇文章主要介紹了關(guān)于在Java中如何使用yaml的實(shí)例,YAML是一種輕量級(jí)的數(shù)據(jù)序列化格式。它以易讀、易寫(xiě)的文本格式表示數(shù)據(jù),支持列表、字典等各種數(shù)據(jù)結(jié)構(gòu),被廣泛應(yīng)用于配置文件、數(shù)據(jù)傳輸協(xié)議等領(lǐng)域,需要的朋友可以參考下
    2023-08-08

最新評(píng)論