欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ消息重試機(jī)制原理分析講解

 更新時(shí)間:2023年02月13日 11:02:06   作者:每天都要進(jìn)步一點(diǎn)點(diǎn)  
這篇文章主要介紹了RocketMQ消息重試機(jī)制,消息的發(fā)送和消費(fèi)并不是百分百成功的,在出現(xiàn)消息推送失敗時(shí),RocketMQ有何補(bǔ)償方式來(lái)進(jìn)行消息重試呢?這是我們今天要一起學(xué)習(xí)的點(diǎn)

一、概述

由于網(wǎng)絡(luò)抖動(dòng)、服務(wù)宕機(jī)等一些不確定的因素,RocketMQ在發(fā)送消息的時(shí)候很有可能出現(xiàn)消息發(fā)送或者消費(fèi)失敗的問(wèn)題。

Consumer消費(fèi)消息失敗通??梢哉J(rèn)為有以下幾種情況:

  • 由于消息本身的原因,例如反序列化失敗,消息數(shù)據(jù)本身無(wú)法處理(例如話費(fèi)充值,當(dāng)前消息的手機(jī)號(hào)被注銷,無(wú)法充值)等。這種錯(cuò)誤通常需要跳過(guò)這條消息,再消費(fèi)其它消息,而這條失敗的消息即使立刻重試消費(fèi),99%也不成功,所以最好提供一種定時(shí)重試機(jī)制,即過(guò)10秒后再重試。
  • 由于依賴的下游應(yīng)用服務(wù)不可用,例如db連接不可用,外系統(tǒng)網(wǎng)絡(luò)不可達(dá)等。遇到這種錯(cuò)誤,即使跳過(guò)當(dāng)前失敗的消息,消費(fèi)其他消息同樣也會(huì)報(bào)錯(cuò)。這種情況建議應(yīng)用sleep 30s,再消費(fèi)下一條消息,這樣可以減輕Broker重試消息的壓力。

如果沒(méi)有消息重試機(jī)制,就可能產(chǎn)生消息丟失的問(wèn)題,這樣就會(huì)對(duì)系統(tǒng)產(chǎn)生較大的影響。RocketMQ內(nèi)部封裝了消息重試的處理流程,無(wú)需開發(fā)人員手動(dòng)處理,并且支持了生產(chǎn)端、消費(fèi)端兩端的重試機(jī)制。

二、生產(chǎn)端的消息重試

生產(chǎn)端的消息重試是指:Producer往Broker上發(fā)消息沒(méi)有發(fā)送成功,比如網(wǎng)絡(luò)原因?qū)е律a(chǎn)者發(fā)送消息到MQ失敗,即發(fā)送端沒(méi)有收到Broker的ACK,導(dǎo)致最終Consumer無(wú)法消費(fèi)消息,此時(shí)RocketMQ會(huì)自動(dòng)進(jìn)行重試。

生產(chǎn)者端的消息重試配置比較簡(jiǎn)單,只需要在定義生產(chǎn)者的時(shí)候,調(diào)用producer.setRetryTimesWhenSendFailed(xxx)方法設(shè)置消息發(fā)送失敗的最大重試次數(shù)。如下:

// 同步發(fā)送消息,如果5秒內(nèi)沒(méi)有發(fā)送成功,則重試3次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(3);
producer.send(msg, 5000L);

三、消費(fèi)端的消息重試

同樣的,由于網(wǎng)絡(luò)原因,Broker發(fā)送消息給消費(fèi)者后,沒(méi)有受到消費(fèi)端的ACK響應(yīng),所以Broker又會(huì)嘗試將消息重新發(fā)送給Consumer,在實(shí)際開發(fā)過(guò)程中,我們更應(yīng)該考慮的是消費(fèi)端的重試。消費(fèi)端的消息重試可以分為順序消息的重試以及無(wú)序消息的重試。

(1)、順序消息的重試

對(duì)于順序消息,當(dāng)消費(fèi)者消費(fèi)消息失敗后,消息隊(duì)列 RocketMQ 會(huì)自動(dòng)不斷進(jìn)行消息重試(每次間隔時(shí)間為 1 秒),這時(shí)應(yīng)用會(huì)出現(xiàn)消息消費(fèi)被阻塞的情況。因此,在使用順序消息時(shí),務(wù)必保證應(yīng)用能夠及時(shí)監(jiān)控并處理消費(fèi)失敗的情況,避免阻塞現(xiàn)象的發(fā)生。

(2)、無(wú)序消息的重試

對(duì)于無(wú)序消息(普通、延時(shí)、事務(wù)消息),當(dāng)消費(fèi)者消費(fèi)消息失敗時(shí),可以通過(guò)設(shè)置返回狀態(tài)達(dá)到消息重試的結(jié)果。

需要注意的是:無(wú)序消息的重試只會(huì)針對(duì)集群消費(fèi)方式(MessageModel.CLUSTERING)生效;廣播方式不提供失敗重試特性,即消費(fèi)失敗后,失敗的消息不再重試,繼續(xù)消費(fèi)新的消息。

四、消息重試次數(shù)

RocketMQ 默認(rèn)允許每條消息最多重試 16 次,每次重試的間隔時(shí)間如下:

第幾次重試與上次重試的間隔時(shí)間第幾次重試與上次重試的間隔時(shí)間
110 秒97 分鐘
230 秒108 分鐘
31 分鐘119 分鐘
42 分鐘1210 分鐘
53 分鐘1320 分鐘
64 分鐘1430 分鐘
75 分鐘151 小時(shí)
86 分鐘162 小時(shí)

如果消息重試 16 次后仍然失敗,消息將不再投遞。

注意: 一條消息無(wú)論重試多少次,這些重試消息的 Message ID 不會(huì)改變。所以就需要我們消費(fèi)者端做好消費(fèi)冪等操作。

五、消息重試配置

集群消費(fèi)方式下,消息消費(fèi)失敗后期望消息重試,需要在消息監(jiān)聽(tīng)器接口的實(shí)現(xiàn)中明確進(jìn)行配置(下述三種方式任選一種):

  • 返回 Action.ReconsumeLater (推薦);
  • 返回 Null;
  • 拋出異常;
public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        //處理消息
        //.....
        //方式1:返回 Action.ReconsumeLater,消息將重試
        return Action.ReconsumeLater;
        //方式2:返回 null,消息將重試
        return null;
        //方式3:直接拋出異常, 消息將重試
        throw new RuntimeException("消費(fèi)消息發(fā)生異常");
    }
}

集群消費(fèi)方式下,如果希望消息失敗后,不進(jìn)行消息重試,那么我們可以捕獲消費(fèi)邏輯中可能拋出的異常,然后返回Action.CommitMessage,那么這條消息將不會(huì)再重試。如下:

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            // 消費(fèi)消息....
        } catch (Throwable e) {
            // 捕獲消費(fèi)邏輯中的所有異常,并返回 Action.CommitMessage;
            return Action.CommitMessage;
        }
        // 消息處理正常,直接返回 Action.CommitMessage;
        return Action.CommitMessage;
    }
}

當(dāng)然,RocketMQ也允許Consumer 啟動(dòng)的時(shí)候設(shè)置最大重試次數(shù),重試時(shí)間間隔將按照如下策略:

  • 最大重試次數(shù)小于等于 16 次,則重試時(shí)間間隔如目錄四:消息重試次數(shù)的描述;
  • 最大重試次數(shù)大于 16 次,超過(guò) 16 次的重試時(shí)間間隔均為每次 2 小時(shí);
Properties properties = new Properties();
//  配置對(duì)應(yīng) Group ID的最大消息重試次數(shù)為 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");
Consumer consumer =ONSFactory.createConsumer(properties);

注意:

  • 消息最大重試次數(shù)的設(shè)置對(duì)相同 Group ID 下的所有 Consumer 實(shí)例有效;
  • 如果只對(duì)相同 Group ID 下兩個(gè) Consumer 實(shí)例中的其中一個(gè)設(shè)置了 MaxReconsumeTimes,那么該配置對(duì)兩個(gè) Consumer 實(shí)例均生效;
  • 配置采用覆蓋的方式生效,即最后啟動(dòng)的 Consumer 實(shí)例會(huì)覆蓋之前的啟動(dòng)實(shí)例的配置;

六、消息重試原理

RocketMQ會(huì)為每個(gè)消費(fèi)者組都設(shè)置一個(gè)Topic名稱為“%RETRY%+consumerGroup”的重試隊(duì)列(這里需要注意的是,這個(gè)Topic的重試隊(duì)列是針對(duì)消費(fèi)組,而不是針對(duì)每個(gè)Topic設(shè)置的),用于暫時(shí)保存因?yàn)楦鞣N異常而導(dǎo)致Consumer端無(wú)法消費(fèi)的消息。

考慮到異?;謴?fù)需要一些時(shí)間,RocketMQ會(huì)為重試隊(duì)列設(shè)置多個(gè)重試級(jí)別,每個(gè)重試級(jí)別都有與之對(duì)應(yīng)的重新投遞延時(shí),重試次數(shù)越多投遞延時(shí)就越大。RocketMQ對(duì)于重試消息的處理是先保存至Topic名稱為“SCHEDULE_TOPIC_XXXX”的延遲隊(duì)列中,后臺(tái)定時(shí)任務(wù)按照對(duì)應(yīng)的時(shí)間進(jìn)行Delay后重新保存至“%RETRY%+consumerGroup”的重試隊(duì)列中。

到此這篇關(guān)于RocketMQ消息重試機(jī)制原理分析講解的文章就介紹到這了,更多相關(guān)RocketMQ消息重試內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 如何使用Spring RestTemplate訪問(wèn)restful服務(wù)

    如何使用Spring RestTemplate訪問(wèn)restful服務(wù)

    這篇文章主要介紹了如何使用Spring RestTemplate訪問(wèn)restful服務(wù),詳細(xì)的介紹了什么是RestTemplate以及簡(jiǎn)單實(shí)現(xiàn),非常具有實(shí)用價(jià)值,需要的朋友可以參考下
    2018-10-10
  • SpringBoot之@ConditionalOnProperty注解使用方法

    SpringBoot之@ConditionalOnProperty注解使用方法

    在平時(shí)業(yè)務(wù)中,我們需要在配置文件中配置某個(gè)屬性來(lái)決定是否需要將某些類進(jìn)行注入,讓Spring進(jìn)行管理,而@ConditionalOnProperty能夠?qū)崿F(xiàn)該功能,文中有詳細(xì)的代碼示例,需要的朋友可以參考下
    2023-05-05
  • Java+Selenium實(shí)現(xiàn)控制瀏覽器的啟動(dòng)選項(xiàng)Options

    Java+Selenium實(shí)現(xiàn)控制瀏覽器的啟動(dòng)選項(xiàng)Options

    這篇文章主要為大家詳細(xì)介紹了如何使用java代碼利用selenium控制瀏覽器的啟動(dòng)選項(xiàng)Options的代碼操作,文中的示例代碼講解詳細(xì),感興趣的可以了解一下
    2023-01-01
  • Spring Batch批處理框架使用解析

    Spring Batch批處理框架使用解析

    這篇文章主要介紹了Spring Batch批處理框架使用解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-12-12
  • 基于Java實(shí)現(xiàn)一個(gè)高效可伸縮的計(jì)算結(jié)果緩存

    基于Java實(shí)現(xiàn)一個(gè)高效可伸縮的計(jì)算結(jié)果緩存

    這篇文章將通過(guò)對(duì)一個(gè)計(jì)算結(jié)果緩存的設(shè)計(jì)迭代介紹,分析每個(gè)版本的并發(fā)缺陷,并分析如何修復(fù)這些缺陷,最終完成一個(gè)高效可伸縮的計(jì)算結(jié)果緩存,感興趣的小伙伴可以了解一下
    2023-06-06
  • Java?Redis配置Redisson的方法詳解

    Java?Redis配置Redisson的方法詳解

    這篇文章主要為大家詳細(xì)介紹了Java?Redis配置Redisson的相關(guān)資料,文中的示例代碼講解詳細(xì),對(duì)我們學(xué)習(xí)或工作有一定參考價(jià)值,感興趣的可以了解一下
    2022-08-08
  • SpringBoot2整合activiti6環(huán)境搭建過(guò)程解析

    SpringBoot2整合activiti6環(huán)境搭建過(guò)程解析

    這篇文章主要介紹了SpringBoot2整合activiti6環(huán)境搭建過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-11-11
  • Java使用Tess4J實(shí)現(xiàn)圖像識(shí)別方式

    Java使用Tess4J實(shí)現(xiàn)圖像識(shí)別方式

    這篇文章主要介紹了Java使用Tess4J實(shí)現(xiàn)圖像識(shí)別方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • java 算法二分查找和折半查找

    java 算法二分查找和折半查找

    這篇文章主要介紹了java 算法二分查找與折半查找的相關(guān)資料,需要的朋友可以參考下
    2017-05-05
  • 在Java中使用MongoDB的方法詳解

    在Java中使用MongoDB的方法詳解

    這篇文章主要給大家介紹了關(guān)于在Java中使用MongoDB的相關(guān)資料,要操作MongoDB數(shù)據(jù)庫(kù)你需要使用MongoDB的Java驅(qū)動(dòng)程序,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-12-12

最新評(píng)論