RocketMQ消息重試機(jī)制原理分析講解
一、概述
由于網(wǎng)絡(luò)抖動(dòng)、服務(wù)宕機(jī)等一些不確定的因素,RocketMQ在發(fā)送消息的時(shí)候很有可能出現(xiàn)消息發(fā)送或者消費(fèi)失敗的問(wèn)題。
Consumer消費(fèi)消息失敗通??梢哉J(rèn)為有以下幾種情況:
- 由于消息本身的原因,例如反序列化失敗,消息數(shù)據(jù)本身無(wú)法處理(例如話費(fèi)充值,當(dāng)前消息的手機(jī)號(hào)被注銷,無(wú)法充值)等。這種錯(cuò)誤通常需要跳過(guò)這條消息,再消費(fèi)其它消息,而這條失敗的消息即使立刻重試消費(fèi),99%也不成功,所以最好提供一種定時(shí)重試機(jī)制,即過(guò)10秒后再重試。
- 由于依賴的下游應(yīng)用服務(wù)不可用,例如db連接不可用,外系統(tǒng)網(wǎng)絡(luò)不可達(dá)等。遇到這種錯(cuò)誤,即使跳過(guò)當(dāng)前失敗的消息,消費(fèi)其他消息同樣也會(huì)報(bào)錯(cuò)。這種情況建議應(yīng)用sleep 30s,再消費(fèi)下一條消息,這樣可以減輕Broker重試消息的壓力。
如果沒(méi)有消息重試機(jī)制,就可能產(chǎn)生消息丟失的問(wèn)題,這樣就會(huì)對(duì)系統(tǒng)產(chǎn)生較大的影響。RocketMQ內(nèi)部封裝了消息重試的處理流程,無(wú)需開發(fā)人員手動(dòng)處理,并且支持了生產(chǎn)端、消費(fèi)端兩端的重試機(jī)制。
二、生產(chǎn)端的消息重試
生產(chǎn)端的消息重試是指:Producer往Broker上發(fā)消息沒(méi)有發(fā)送成功,比如網(wǎng)絡(luò)原因?qū)е律a(chǎn)者發(fā)送消息到MQ失敗,即發(fā)送端沒(méi)有收到Broker的ACK,導(dǎo)致最終Consumer無(wú)法消費(fèi)消息,此時(shí)RocketMQ會(huì)自動(dòng)進(jìn)行重試。
生產(chǎn)者端的消息重試配置比較簡(jiǎn)單,只需要在定義生產(chǎn)者的時(shí)候,調(diào)用producer.setRetryTimesWhenSendFailed(xxx)方法設(shè)置消息發(fā)送失敗的最大重試次數(shù)。如下:
// 同步發(fā)送消息,如果5秒內(nèi)沒(méi)有發(fā)送成功,則重試3次 DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer"); producer.setRetryTimesWhenSendFailed(3); producer.send(msg, 5000L);
三、消費(fèi)端的消息重試
同樣的,由于網(wǎng)絡(luò)原因,Broker發(fā)送消息給消費(fèi)者后,沒(méi)有受到消費(fèi)端的ACK響應(yīng),所以Broker又會(huì)嘗試將消息重新發(fā)送給Consumer,在實(shí)際開發(fā)過(guò)程中,我們更應(yīng)該考慮的是消費(fèi)端的重試。消費(fèi)端的消息重試可以分為順序消息的重試以及無(wú)序消息的重試。
(1)、順序消息的重試
對(duì)于順序消息,當(dāng)消費(fèi)者消費(fèi)消息失敗后,消息隊(duì)列 RocketMQ 會(huì)自動(dòng)不斷進(jìn)行消息重試(每次間隔時(shí)間為 1 秒),這時(shí)應(yīng)用會(huì)出現(xiàn)消息消費(fèi)被阻塞的情況。因此,在使用順序消息時(shí),務(wù)必保證應(yīng)用能夠及時(shí)監(jiān)控并處理消費(fèi)失敗的情況,避免阻塞現(xiàn)象的發(fā)生。
(2)、無(wú)序消息的重試
對(duì)于無(wú)序消息(普通、延時(shí)、事務(wù)消息),當(dāng)消費(fèi)者消費(fèi)消息失敗時(shí),可以通過(guò)設(shè)置返回狀態(tài)達(dá)到消息重試的結(jié)果。
需要注意的是:無(wú)序消息的重試只會(huì)針對(duì)集群消費(fèi)方式(MessageModel.CLUSTERING)生效;廣播方式不提供失敗重試特性,即消費(fèi)失敗后,失敗的消息不再重試,繼續(xù)消費(fèi)新的消息。
四、消息重試次數(shù)
RocketMQ 默認(rèn)允許每條消息最多重試 16 次,每次重試的間隔時(shí)間如下:
第幾次重試 | 與上次重試的間隔時(shí)間 | 第幾次重試 | 與上次重試的間隔時(shí)間 |
1 | 10 秒 | 9 | 7 分鐘 |
2 | 30 秒 | 10 | 8 分鐘 |
3 | 1 分鐘 | 11 | 9 分鐘 |
4 | 2 分鐘 | 12 | 10 分鐘 |
5 | 3 分鐘 | 13 | 20 分鐘 |
6 | 4 分鐘 | 14 | 30 分鐘 |
7 | 5 分鐘 | 15 | 1 小時(shí) |
8 | 6 分鐘 | 16 | 2 小時(shí) |
如果消息重試 16 次后仍然失敗,消息將不再投遞。
注意: 一條消息無(wú)論重試多少次,這些重試消息的 Message ID 不會(huì)改變。所以就需要我們消費(fèi)者端做好消費(fèi)冪等操作。
五、消息重試配置
集群消費(fèi)方式下,消息消費(fèi)失敗后期望消息重試,需要在消息監(jiān)聽(tīng)器接口的實(shí)現(xiàn)中明確進(jìn)行配置(下述三種方式任選一種):
- 返回 Action.ReconsumeLater (推薦);
- 返回 Null;
- 拋出異常;
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { //處理消息 //..... //方式1:返回 Action.ReconsumeLater,消息將重試 return Action.ReconsumeLater; //方式2:返回 null,消息將重試 return null; //方式3:直接拋出異常, 消息將重試 throw new RuntimeException("消費(fèi)消息發(fā)生異常"); } }
集群消費(fèi)方式下,如果希望消息失敗后,不進(jìn)行消息重試,那么我們可以捕獲消費(fèi)邏輯中可能拋出的異常,然后返回Action.CommitMessage,那么這條消息將不會(huì)再重試。如下:
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { try { // 消費(fèi)消息.... } catch (Throwable e) { // 捕獲消費(fèi)邏輯中的所有異常,并返回 Action.CommitMessage; return Action.CommitMessage; } // 消息處理正常,直接返回 Action.CommitMessage; return Action.CommitMessage; } }
當(dāng)然,RocketMQ也允許Consumer 啟動(dòng)的時(shí)候設(shè)置最大重試次數(shù),重試時(shí)間間隔將按照如下策略:
- 最大重試次數(shù)小于等于 16 次,則重試時(shí)間間隔如目錄四:消息重試次數(shù)的描述;
- 最大重試次數(shù)大于 16 次,超過(guò) 16 次的重試時(shí)間間隔均為每次 2 小時(shí);
Properties properties = new Properties(); // 配置對(duì)應(yīng) Group ID的最大消息重試次數(shù)為 20 次 properties.put(PropertyKeyConst.MaxReconsumeTimes, "20"); Consumer consumer =ONSFactory.createConsumer(properties);
注意:
- 消息最大重試次數(shù)的設(shè)置對(duì)相同 Group ID 下的所有 Consumer 實(shí)例有效;
- 如果只對(duì)相同 Group ID 下兩個(gè) Consumer 實(shí)例中的其中一個(gè)設(shè)置了 MaxReconsumeTimes,那么該配置對(duì)兩個(gè) Consumer 實(shí)例均生效;
- 配置采用覆蓋的方式生效,即最后啟動(dòng)的 Consumer 實(shí)例會(huì)覆蓋之前的啟動(dòng)實(shí)例的配置;
六、消息重試原理
RocketMQ會(huì)為每個(gè)消費(fèi)者組都設(shè)置一個(gè)Topic名稱為“%RETRY%+consumerGroup”的重試隊(duì)列(這里需要注意的是,這個(gè)Topic的重試隊(duì)列是針對(duì)消費(fèi)組,而不是針對(duì)每個(gè)Topic設(shè)置的),用于暫時(shí)保存因?yàn)楦鞣N異常而導(dǎo)致Consumer端無(wú)法消費(fèi)的消息。
考慮到異?;謴?fù)需要一些時(shí)間,RocketMQ會(huì)為重試隊(duì)列設(shè)置多個(gè)重試級(jí)別,每個(gè)重試級(jí)別都有與之對(duì)應(yīng)的重新投遞延時(shí),重試次數(shù)越多投遞延時(shí)就越大。RocketMQ對(duì)于重試消息的處理是先保存至Topic名稱為“SCHEDULE_TOPIC_XXXX”的延遲隊(duì)列中,后臺(tái)定時(shí)任務(wù)按照對(duì)應(yīng)的時(shí)間進(jìn)行Delay后重新保存至“%RETRY%+consumerGroup”的重試隊(duì)列中。
到此這篇關(guān)于RocketMQ消息重試機(jī)制原理分析講解的文章就介紹到這了,更多相關(guān)RocketMQ消息重試內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
如何使用Spring RestTemplate訪問(wèn)restful服務(wù)
這篇文章主要介紹了如何使用Spring RestTemplate訪問(wèn)restful服務(wù),詳細(xì)的介紹了什么是RestTemplate以及簡(jiǎn)單實(shí)現(xiàn),非常具有實(shí)用價(jià)值,需要的朋友可以參考下2018-10-10SpringBoot之@ConditionalOnProperty注解使用方法
在平時(shí)業(yè)務(wù)中,我們需要在配置文件中配置某個(gè)屬性來(lái)決定是否需要將某些類進(jìn)行注入,讓Spring進(jìn)行管理,而@ConditionalOnProperty能夠?qū)崿F(xiàn)該功能,文中有詳細(xì)的代碼示例,需要的朋友可以參考下2023-05-05Java+Selenium實(shí)現(xiàn)控制瀏覽器的啟動(dòng)選項(xiàng)Options
這篇文章主要為大家詳細(xì)介紹了如何使用java代碼利用selenium控制瀏覽器的啟動(dòng)選項(xiàng)Options的代碼操作,文中的示例代碼講解詳細(xì),感興趣的可以了解一下2023-01-01基于Java實(shí)現(xiàn)一個(gè)高效可伸縮的計(jì)算結(jié)果緩存
這篇文章將通過(guò)對(duì)一個(gè)計(jì)算結(jié)果緩存的設(shè)計(jì)迭代介紹,分析每個(gè)版本的并發(fā)缺陷,并分析如何修復(fù)這些缺陷,最終完成一個(gè)高效可伸縮的計(jì)算結(jié)果緩存,感興趣的小伙伴可以了解一下2023-06-06SpringBoot2整合activiti6環(huán)境搭建過(guò)程解析
這篇文章主要介紹了SpringBoot2整合activiti6環(huán)境搭建過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11Java使用Tess4J實(shí)現(xiàn)圖像識(shí)別方式
這篇文章主要介紹了Java使用Tess4J實(shí)現(xiàn)圖像識(shí)別方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10