欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ消息重試機制原理分析講解

 更新時間:2023年02月13日 11:02:06   作者:每天都要進步一點點  
這篇文章主要介紹了RocketMQ消息重試機制,消息的發(fā)送和消費并不是百分百成功的,在出現(xiàn)消息推送失敗時,RocketMQ有何補償方式來進行消息重試呢?這是我們今天要一起學習的點

一、概述

由于網絡抖動、服務宕機等一些不確定的因素,RocketMQ在發(fā)送消息的時候很有可能出現(xiàn)消息發(fā)送或者消費失敗的問題。

Consumer消費消息失敗通常可以認為有以下幾種情況:

  • 由于消息本身的原因,例如反序列化失敗,消息數(shù)據本身無法處理(例如話費充值,當前消息的手機號被注銷,無法充值)等。這種錯誤通常需要跳過這條消息,再消費其它消息,而這條失敗的消息即使立刻重試消費,99%也不成功,所以最好提供一種定時重試機制,即過10秒后再重試。
  • 由于依賴的下游應用服務不可用,例如db連接不可用,外系統(tǒng)網絡不可達等。遇到這種錯誤,即使跳過當前失敗的消息,消費其他消息同樣也會報錯。這種情況建議應用sleep 30s,再消費下一條消息,這樣可以減輕Broker重試消息的壓力。

如果沒有消息重試機制,就可能產生消息丟失的問題,這樣就會對系統(tǒng)產生較大的影響。RocketMQ內部封裝了消息重試的處理流程,無需開發(fā)人員手動處理,并且支持了生產端、消費端兩端的重試機制。

二、生產端的消息重試

生產端的消息重試是指:Producer往Broker上發(fā)消息沒有發(fā)送成功,比如網絡原因導致生產者發(fā)送消息到MQ失敗,即發(fā)送端沒有收到Broker的ACK,導致最終Consumer無法消費消息,此時RocketMQ會自動進行重試。

生產者端的消息重試配置比較簡單,只需要在定義生產者的時候,調用producer.setRetryTimesWhenSendFailed(xxx)方法設置消息發(fā)送失敗的最大重試次數(shù)。如下:

// 同步發(fā)送消息,如果5秒內沒有發(fā)送成功,則重試3次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(3);
producer.send(msg, 5000L);

三、消費端的消息重試

同樣的,由于網絡原因,Broker發(fā)送消息給消費者后,沒有受到消費端的ACK響應,所以Broker又會嘗試將消息重新發(fā)送給Consumer,在實際開發(fā)過程中,我們更應該考慮的是消費端的重試。消費端的消息重試可以分為順序消息的重試以及無序消息的重試。

(1)、順序消息的重試

對于順序消息,當消費者消費消息失敗后,消息隊列 RocketMQ 會自動不斷進行消息重試(每次間隔時間為 1 秒),這時應用會出現(xiàn)消息消費被阻塞的情況。因此,在使用順序消息時,務必保證應用能夠及時監(jiān)控并處理消費失敗的情況,避免阻塞現(xiàn)象的發(fā)生。

(2)、無序消息的重試

對于無序消息(普通、延時、事務消息),當消費者消費消息失敗時,可以通過設置返回狀態(tài)達到消息重試的結果。

需要注意的是:無序消息的重試只會針對集群消費方式(MessageModel.CLUSTERING)生效;廣播方式不提供失敗重試特性,即消費失敗后,失敗的消息不再重試,繼續(xù)消費新的消息。

四、消息重試次數(shù)

RocketMQ 默認允許每條消息最多重試 16 次,每次重試的間隔時間如下:

第幾次重試與上次重試的間隔時間第幾次重試與上次重試的間隔時間
110 秒97 分鐘
230 秒108 分鐘
31 分鐘119 分鐘
42 分鐘1210 分鐘
53 分鐘1320 分鐘
64 分鐘1430 分鐘
75 分鐘151 小時
86 分鐘162 小時

如果消息重試 16 次后仍然失敗,消息將不再投遞。

注意: 一條消息無論重試多少次,這些重試消息的 Message ID 不會改變。所以就需要我們消費者端做好消費冪等操作。

五、消息重試配置

集群消費方式下,消息消費失敗后期望消息重試,需要在消息監(jiān)聽器接口的實現(xiàn)中明確進行配置(下述三種方式任選一種):

  • 返回 Action.ReconsumeLater (推薦);
  • 返回 Null;
  • 拋出異常;
public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        //處理消息
        //.....
        //方式1:返回 Action.ReconsumeLater,消息將重試
        return Action.ReconsumeLater;
        //方式2:返回 null,消息將重試
        return null;
        //方式3:直接拋出異常, 消息將重試
        throw new RuntimeException("消費消息發(fā)生異常");
    }
}

集群消費方式下,如果希望消息失敗后,不進行消息重試,那么我們可以捕獲消費邏輯中可能拋出的異常,然后返回Action.CommitMessage,那么這條消息將不會再重試。如下:

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            // 消費消息....
        } catch (Throwable e) {
            // 捕獲消費邏輯中的所有異常,并返回 Action.CommitMessage;
            return Action.CommitMessage;
        }
        // 消息處理正常,直接返回 Action.CommitMessage;
        return Action.CommitMessage;
    }
}

當然,RocketMQ也允許Consumer 啟動的時候設置最大重試次數(shù),重試時間間隔將按照如下策略:

  • 最大重試次數(shù)小于等于 16 次,則重試時間間隔如目錄四:消息重試次數(shù)的描述;
  • 最大重試次數(shù)大于 16 次,超過 16 次的重試時間間隔均為每次 2 小時;
Properties properties = new Properties();
//  配置對應 Group ID的最大消息重試次數(shù)為 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");
Consumer consumer =ONSFactory.createConsumer(properties);

注意:

  • 消息最大重試次數(shù)的設置對相同 Group ID 下的所有 Consumer 實例有效;
  • 如果只對相同 Group ID 下兩個 Consumer 實例中的其中一個設置了 MaxReconsumeTimes,那么該配置對兩個 Consumer 實例均生效;
  • 配置采用覆蓋的方式生效,即最后啟動的 Consumer 實例會覆蓋之前的啟動實例的配置;

六、消息重試原理

RocketMQ會為每個消費者組都設置一個Topic名稱為“%RETRY%+consumerGroup”的重試隊列(這里需要注意的是,這個Topic的重試隊列是針對消費組,而不是針對每個Topic設置的),用于暫時保存因為各種異常而導致Consumer端無法消費的消息。

考慮到異常恢復需要一些時間,RocketMQ會為重試隊列設置多個重試級別,每個重試級別都有與之對應的重新投遞延時,重試次數(shù)越多投遞延時就越大。RocketMQ對于重試消息的處理是先保存至Topic名稱為“SCHEDULE_TOPIC_XXXX”的延遲隊列中,后臺定時任務按照對應的時間進行Delay后重新保存至“%RETRY%+consumerGroup”的重試隊列中。

到此這篇關于RocketMQ消息重試機制原理分析講解的文章就介紹到這了,更多相關RocketMQ消息重試內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 如何使用Spring RestTemplate訪問restful服務

    如何使用Spring RestTemplate訪問restful服務

    這篇文章主要介紹了如何使用Spring RestTemplate訪問restful服務,詳細的介紹了什么是RestTemplate以及簡單實現(xiàn),非常具有實用價值,需要的朋友可以參考下
    2018-10-10
  • SpringBoot之@ConditionalOnProperty注解使用方法

    SpringBoot之@ConditionalOnProperty注解使用方法

    在平時業(yè)務中,我們需要在配置文件中配置某個屬性來決定是否需要將某些類進行注入,讓Spring進行管理,而@ConditionalOnProperty能夠實現(xiàn)該功能,文中有詳細的代碼示例,需要的朋友可以參考下
    2023-05-05
  • Java+Selenium實現(xiàn)控制瀏覽器的啟動選項Options

    Java+Selenium實現(xiàn)控制瀏覽器的啟動選項Options

    這篇文章主要為大家詳細介紹了如何使用java代碼利用selenium控制瀏覽器的啟動選項Options的代碼操作,文中的示例代碼講解詳細,感興趣的可以了解一下
    2023-01-01
  • Spring Batch批處理框架使用解析

    Spring Batch批處理框架使用解析

    這篇文章主要介紹了Spring Batch批處理框架使用解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-12-12
  • 基于Java實現(xiàn)一個高效可伸縮的計算結果緩存

    基于Java實現(xiàn)一個高效可伸縮的計算結果緩存

    這篇文章將通過對一個計算結果緩存的設計迭代介紹,分析每個版本的并發(fā)缺陷,并分析如何修復這些缺陷,最終完成一個高效可伸縮的計算結果緩存,感興趣的小伙伴可以了解一下
    2023-06-06
  • Java?Redis配置Redisson的方法詳解

    Java?Redis配置Redisson的方法詳解

    這篇文章主要為大家詳細介紹了Java?Redis配置Redisson的相關資料,文中的示例代碼講解詳細,對我們學習或工作有一定參考價值,感興趣的可以了解一下
    2022-08-08
  • SpringBoot2整合activiti6環(huán)境搭建過程解析

    SpringBoot2整合activiti6環(huán)境搭建過程解析

    這篇文章主要介紹了SpringBoot2整合activiti6環(huán)境搭建過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-11-11
  • Java使用Tess4J實現(xiàn)圖像識別方式

    Java使用Tess4J實現(xiàn)圖像識別方式

    這篇文章主要介紹了Java使用Tess4J實現(xiàn)圖像識別方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • java 算法二分查找和折半查找

    java 算法二分查找和折半查找

    這篇文章主要介紹了java 算法二分查找與折半查找的相關資料,需要的朋友可以參考下
    2017-05-05
  • 在Java中使用MongoDB的方法詳解

    在Java中使用MongoDB的方法詳解

    這篇文章主要給大家介紹了關于在Java中使用MongoDB的相關資料,要操作MongoDB數(shù)據庫你需要使用MongoDB的Java驅動程序,文中通過代碼介紹的非常詳細,需要的朋友可以參考下
    2023-12-12

最新評論