欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

關于RocketMQ使用事務消息

 更新時間:2023年05月08日 09:23:35   作者:樂觀男孩  
RocketMQ是一種提供消息隊列服務的中間件,也稱為消息中間件,是一套提供了消息生產(chǎn)、存儲、消費全過程API的軟件系統(tǒng)。消息即數(shù)據(jù)。一般消息的體量不會很大,需要的朋友可以參考下

說明

事務消息:

1、不支持延時消息和批量消息
2、如果消息沒有及時提交,默認check 15次,可以通過Broker的transactionCheckMax參數(shù)配置次數(shù)。如果超時15次依然沒有得到明確結果,將會打印異常信息,具體的處理策略可以通過復寫AbstractTransactionCheckListener類實現(xiàn)
3、每次check的時間間隔可以通過Broker的transactionTimeout配置,也可以在消息中增加CHECK_IMMUNITY_TIME_IN_SECONDS屬性指定
4、事務狀態(tài):LocalTransactionState.COMMIT_MESSAGE、LocalTransactionState.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW。

原理

事務消息是RocketMQ的一大特性,其保證發(fā)送消息和執(zhí)行本地邏輯在同一個事務內(nèi)。實現(xiàn)的思路借鑒了兩階段提交協(xié)議:

第一階段:發(fā)送半事務消息,消息發(fā)送后,消息是對消費者透明的,也就是該消息還不屬于可消費消息,消費者無法消費。

第二階段:執(zhí)行本地事務,本地執(zhí)行事務后提交消息。

(1)、如果事務執(zhí)行失敗,則回滾消息;
(2)、如果事務執(zhí)行成功,則提交消息,提交后消費者可消費到消息;
(3)、如果事務執(zhí)行成功,但消息提交失敗,RocketMQ還提供了回查機制:如果一段時間過后,沒有提交/回滾半事務消息,RocketMQ會定時回查一定的次數(shù),獲取本地事務的狀態(tài)以決定是提交還是回滾消息。

如果回查一定的次數(shù)后依然沒有獲取到本地事務的明確狀態(tài),則消息會被放到死信隊列,由人工確認如何處理。

事務消息處理流程

在這里插入圖片描述

1、生產(chǎn)端發(fā)送半事務消息到服務端
2、服務端返回半事務消息發(fā)送成功響應。注意,此時的消息對消費端是不可見的,不可被消費
3、發(fā)送方執(zhí)行本地事務
4、執(zhí)行完本地事務后,客戶端同步服務端提交/回滾消息
5、如果服務端在一定的時間內(nèi),等不到4的回應,則定時進行回查,詢問客戶端的本地事務狀態(tài)。
6、客戶端檢查本地事務狀態(tài)
7、根據(jù)本地事務執(zhí)行情況,告知服務端,服務端決定是提交消息還是丟棄消息。

生產(chǎn)端

@Test
    public void sendMessage() throws Exception {
        //事務生產(chǎn)者
        TransactionMQProducer producer = new TransactionMQProducer("defaultGroup");
        producer.setNamesrvAddr(SpringUtil.getBean(RocketMqConfig.class).getNamesrvAddr());
        //設置檢查本地事務狀態(tài)的線程池
        //producer.setExecutorService(null);
        //本地事務執(zhí)行監(jiān)聽器
        TransactionListener transactionListener = new TransactionListenerImpl();
        producer.setTransactionListener(transactionListener);
        producer.start();
        Message message = new Message(RocketMqUtil.TOPIC, "transaction", "transaction-message".getBytes(Charset.forName("UTF-8")));
        //發(fā)送事務消息
        producer.sendMessageInTransaction(message, null);
    }
    class TransactionListenerImpl implements TransactionListener {
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            //執(zhí)行本地事務(數(shù)據(jù)庫)操作......
            int num = new Random().nextInt(10);
            if (num < 3) {
                //本地事務執(zhí)行成功,提交消息
                return LocalTransactionState.COMMIT_MESSAGE;
            } else if (num < 6) {
                //本地事務執(zhí)行失敗,刪除消息
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            //等待本地事務check,即執(zhí)行checkLocalTransaction()方法
            return LocalTransactionState.UNKNOW;
        }
        /**
         * 回查邏輯
         * @param msg
         * @return
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            int num = new Random().nextInt(10);
            if (num < 3) {
                //提交消息
                return LocalTransactionState.COMMIT_MESSAGE;
            } else if (num < 6) {
                //刪除消息
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            return LocalTransactionState.UNKNOW;
        }
    }

發(fā)送事務消息步驟:

1、初始化TransactionMQProducer實例
2、指定check線程池(回查線程池)
3、為Producer添加自定義事務監(jiān)聽器。自定義事務監(jiān)聽器需實現(xiàn)TransactionListener接口,通過覆蓋接口的executeLocalTransaction方法執(zhí)行本地事務,返回事務狀態(tài),客戶端會根據(jù)本地事務狀態(tài)通知服務端,決定是否提交消息;通過覆蓋接口的checkLocalTransaction方法提供回查機制,當在一定的時間內(nèi)服務端獲取不到本地事務執(zhí)行狀態(tài),將通過該方法回查事務狀態(tài),以決定消失是否需要提交。
4、通過Producer.sendMessageInTransaction發(fā)送事務消息。

消費者正常消費邏輯

消費端

@Test
    public void consumeMessage() throws Exception {
        DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer();
        defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "*");
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
                                                            ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                log.info("消費到消息條數(shù):{}", list.size());
                list.stream().map(messageExt -> new String(messageExt.getBody(), Charset.forName("UTF-8")))
                        .map(String::new).forEach(System.out::println);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        defaultMQPushConsumer.start();
        Thread.sleep(5000L);
    }

消費端正常消費消息即可。

到此這篇關于關于RocketMQ使用事務消息的文章就介紹到這了,更多相關RocketMQ事務消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 一文教你掌握Java如何實現(xiàn)判空

    一文教你掌握Java如何實現(xiàn)判空

    實際項目中我們會有很多地方需要判空校驗,如果不做判空校驗則可能產(chǎn)生NullPointerException異常。所以本文小編為大家整理了Java中幾個常見的判空方法,希望對大家有所幫助
    2023-04-04
  • Java鏈表中元素刪除的實現(xiàn)方法詳解【只刪除一個元素情況】

    Java鏈表中元素刪除的實現(xiàn)方法詳解【只刪除一個元素情況】

    這篇文章主要介紹了Java鏈表中元素刪除的實現(xiàn)方法,結合實例形式分析了java只刪除鏈表中一個元素的相關操作原理、實現(xiàn)方法與注意事項,需要的朋友可以參考下
    2020-03-03
  • 基于web項目log日志指定輸出文件位置配置方法

    基于web項目log日志指定輸出文件位置配置方法

    下面小編就為大家分享一篇基于web項目log日志指定輸出文件位置配置方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-04-04
  • 詳解java中if語句和switch的使用

    詳解java中if語句和switch的使用

    這篇文章主要介紹了java中if語句和switch的使用,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-11-11
  • 詳解Java 包掃描實現(xiàn)和應用(Jar篇)

    詳解Java 包掃描實現(xiàn)和應用(Jar篇)

    這篇文章主要介紹了詳解Java 包掃描實現(xiàn)和應用(Jar篇),本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-07-07
  • 詳解如何實現(xiàn)SpringBoot的底層注解

    詳解如何實現(xiàn)SpringBoot的底層注解

    今天給大家?guī)淼奈恼率侨绾螌崿F(xiàn)SpringBoot的底層注解,文中有非常詳細的介紹及代碼示例,對正在學習java的小伙伴很有幫助,需要的朋友可以參考下
    2021-06-06
  • 調用java.lang.Runtime.exec的正確姿勢分享

    調用java.lang.Runtime.exec的正確姿勢分享

    這篇文章主要介紹了調用java.lang.Runtime.exec的正確姿勢,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • 使用Feign動態(tài)設置header和原理分析

    使用Feign動態(tài)設置header和原理分析

    這篇文章主要介紹了使用Feign動態(tài)設置header和原理分析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • Java集合框架之List ArrayList LinkedList使用詳解刨析

    Java集合框架之List ArrayList LinkedList使用詳解刨析

    早在 Java 2 中之前,Java 就提供了特設類。比如:Dictionary, Vector, Stack, 和 Properties 這些類用來存儲和操作對象組。雖然這些類都非常有用,但是它們?nèi)鄙僖粋€核心的,統(tǒng)一的主題。由于這個原因,使用 Vector 類的方式和使用 Properties 類的方式有著很大不同
    2021-10-10
  • springboot文件上傳保存路徑的問題

    springboot文件上傳保存路徑的問題

    這篇文章主要介紹了springboot文件上傳保存路徑的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09

最新評論