RocketMq 消息重試機(jī)制及死信隊(duì)列詳解
生產(chǎn)者消息重試
消息隊(duì)列中的消息消費(fèi)時(shí)并不能保證總是成功的,那失敗的消息該怎么進(jìn)行消息補(bǔ)償呢?這就用到今天的主角消息重試和死信隊(duì)列了。
有時(shí)因?yàn)榫W(wǎng)路等原因生產(chǎn)者也可能發(fā)送消息失敗,也會(huì)進(jìn)行消息重試,生產(chǎn)者消息重試比較簡單,在springboot中只要在配置文件中配置一下就可以了。
# 異步消息發(fā)送失敗重試次數(shù),默認(rèn)為2 rocketmq.producer.retry-times-when-send-async-failed=2 # 消息發(fā)送失敗重試次數(shù),默認(rèn)為2 rocketmq.producer.retry-times-when-send-failed=2
也可以通過下面這種方式配置
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(); defaultMQProducer.setRetryTimesWhenSendFailed(2); defaultMQProducer.setRetryTimesWhenSendAsyncFailed(2);
消費(fèi)者消息重試
Apache RocketMQ 有兩種消費(fèi)模式:集群消費(fèi)模式和廣播消費(fèi)模式。消息重試只針對(duì)集群消費(fèi)模式生效;廣播消費(fèi)模式不提供失敗重試特性,即消費(fèi)失敗后,失敗消息不再重試,繼續(xù)消費(fèi)新的消息。
同時(shí)RocketMq Push消費(fèi)提供了兩種消費(fèi)方式:并發(fā)消費(fèi)和順序消費(fèi)。
并發(fā)消費(fèi)
在并發(fā)消費(fèi)中,可能會(huì)有多個(gè)線程同時(shí)消費(fèi)一個(gè)隊(duì)列的消息,因此即使發(fā)送端通過發(fā)送順序消息保證消息在同一個(gè)隊(duì)列中按照FIFO的順序,也無法保證消息實(shí)際被順序消費(fèi),所有并發(fā)消費(fèi)也可以稱之為無序消費(fèi)。
順序消費(fèi)
順序消費(fèi)是消息生產(chǎn)者發(fā)送過來的消息會(huì)遵循FIFO隊(duì)列的思想,先進(jìn)先出有順序的消費(fèi)消息。 對(duì)于順序消息,當(dāng)消費(fèi)者消費(fèi)消息失敗后,消息隊(duì)列 RocketMQ 會(huì)自動(dòng)不斷進(jìn)行消息重試(每次間隔時(shí)間為 1 秒),這時(shí),應(yīng)用會(huì)出現(xiàn)消息消費(fèi)被阻塞的情況。因此,在使用順序消息時(shí),務(wù)必保證應(yīng)用能夠及時(shí)監(jiān)控并處理消費(fèi)失敗的情況,避免阻塞現(xiàn)象的發(fā)生。
并發(fā)消費(fèi)和順序消費(fèi)區(qū)別
順序消費(fèi)和并發(fā)消費(fèi)的重試機(jī)制并不相同,順序消費(fèi)消費(fèi)失敗后會(huì)先在客戶端本地重試直到最大重試次數(shù),這樣可以避免消費(fèi)失敗的消息被跳過,消費(fèi)下一條消息而打亂順序消費(fèi)的順序,而并發(fā)消費(fèi)消費(fèi)失敗后會(huì)將消費(fèi)失敗的消息重新投遞回服務(wù)端,再等待服務(wù)端重新投遞回來,在這期間會(huì)正常消費(fèi)隊(duì)列后面的消息。
并發(fā)消費(fèi)失敗后并不是投遞回原Topic,而是投遞到一個(gè)特殊Topic,其命名為%RETRY%ConsumerGroupName,集群模式下并發(fā)消費(fèi)每一個(gè)ConsumerGroup會(huì)對(duì)應(yīng)一個(gè)特殊Topic,并會(huì)訂閱該Topic。
兩者參數(shù)差別如下
消費(fèi)類型 | 重試間隔 | 最大重試次數(shù) |
---|---|---|
順序消費(fèi) | 間隔時(shí)間可通過自定義設(shè)置,SuspendCurrentQueueTimeMillis | 最大重試次數(shù)可通過自定義參數(shù)MaxReconsumeTimes取值進(jìn)行配置。該參數(shù)取值無最大限制。若未設(shè)置參數(shù)值,默認(rèn)最大重試次數(shù)為Integer.MAX |
并發(fā)消費(fèi) | 間隔時(shí)間根據(jù)重試次數(shù)階梯變化,取值范圍:1秒~2小時(shí)。不支持自定義配置 | 最大重試次數(shù)可通過自定義參數(shù)MaxReconsumeTimes取值進(jìn)行配置。默認(rèn)值為16次,該參數(shù)取值無最大限制,建議使用默認(rèn)值 |
并發(fā)消費(fèi)重試間隔如下:
第幾次重試 | 與上次重試的間隔時(shí)間 | 第幾次重試 | 與上次重試的間隔時(shí)間 |
---|---|---|---|
1 | 10s | 9 | 7min |
2 | 30s | 10 | 8min |
3 | 1min | 11 | 9min |
4 | 2min | 12 | 10min |
5 | 3min | 13 | 20min |
6 | 4min | 14 | 30min |
7 | 5min | 15 | 1h |
8 | 6min | 16 | 2h |
死信隊(duì)列
當(dāng)一條消息初次消費(fèi)失敗,RocketMQ會(huì)自動(dòng)進(jìn)行消息重試,達(dá)到最大重試次數(shù)后,若消費(fèi)依然失敗,則表明消費(fèi)者在正常情況下無法正確地消費(fèi)該消息。此時(shí),該消息不會(huì)立刻被丟棄,而是將其發(fā)送到該消費(fèi)者對(duì)應(yīng)的特殊隊(duì)列中,這類消息稱為死信消息(Dead-Letter Message),存儲(chǔ)死信消息的特殊隊(duì)列稱為死信隊(duì)列(Dead-Letter Queue),死信隊(duì)列是死信Topic下分區(qū)數(shù)唯一的單獨(dú)隊(duì)列。如果產(chǎn)生了死信消息,那對(duì)應(yīng)的ConsumerGroup的死信Topic名稱為%DLQ%ConsumerGroupName,死信隊(duì)列的消息將不會(huì)再被消費(fèi)??梢岳肦ocketMQ Admin工具或者RocketMQ Dashboard上查詢到對(duì)應(yīng)死信消息的信息。
實(shí)踐出真知
Talk is cheap,show you the code.
公共部分創(chuàng)建
- 配置文件
rocketmq.name-server=localhost:9876 # 消費(fèi)者組 rocketmq.producer.group=producer_group rocketmq.consumer.topic=consumer_topic rocketmq.consumer.group=consumer_group
- 創(chuàng)建消費(fèi)者RetryConsumerDemo
@Component public class RetryConsumerDemo { @Value("${rocketmq.name-server}") private String namesrvAddr; @Value("${rocketmq.consumer.topic}") private String topic; @Value("${rocketmq.consumer.group}") private String consumerGroup; private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group"); @PostConstruct public void start() { try { consumer.setNamesrvAddr(namesrvAddr); //設(shè)置集群消費(fèi)模式 consumer.setMessageModel(MessageModel.CLUSTERING); //設(shè)置消費(fèi)超時(shí)時(shí)間(分鐘) consumer.setConsumeTimeout(1); //訂閱主題 consumer.subscribe(topic , "*"); //注冊(cè)消息監(jiān)聽器 consumer.registerMessageListener(new MessageListenerConcurrentlyImpl()); //最大重試次數(shù) consumer.setMaxReconsumeTimes(2); //啟動(dòng)消費(fèi)端 consumer.start(); System.out.println("Retry Consumer Start..."); } catch (MQClientException e) { e.printStackTrace(); } } }
測(cè)試并發(fā)消費(fèi)
- 創(chuàng)建并發(fā)消費(fèi)監(jiān)聽類 并發(fā)消費(fèi)監(jiān)聽類要實(shí)現(xiàn)MessageListenerConcurrently類
public class MessageListenerConcurrentlyImpl implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { if (CollectionUtils.isEmpty(msgs)) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } MessageExt message = msgs.get(0); try { final LocalDateTime now = LocalDateTime.now(); //逐條消費(fèi) String messageBody = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("當(dāng)前時(shí)間:"+now+", messageId: " + message.getMsgId() + ",topic: " + message.getTopic() + ",messageBody: " + messageBody); //模擬消費(fèi)失敗 if ("Concurrently_test".equals(messageBody)) { int a = 1 / 0; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }
- 注冊(cè)監(jiān)聽類 在消費(fèi)者類RetryConsumerDemo中注冊(cè)監(jiān)聽類
//注冊(cè)消息監(jiān)聽器 consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());
- 測(cè)試
@RunWith(SpringRunner.class) @SpringBootTest(classes = RocketmqApplication.class) class RocketmqApplicationTests { @Value("${rocketmq.consumer.topic}") private String topic; @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void testProducer(){ String msg = "Concurrently_test"; rocketMQTemplate.convertAndSend(topic , msg); } }
測(cè)試結(jié)果:
后面重試時(shí)間太長就不做測(cè)試了,可以看到并發(fā)消費(fèi)的消息時(shí)間都是按照上面那張時(shí)間間隔表來。
然后通過RocketMq Dashboard Topic一欄可以看到有一個(gè)重試消費(fèi)者組%RETRY%consumer_group
,這個(gè)消費(fèi)者組內(nèi)存放的就是consumer_group消費(fèi)者組消費(fèi)失敗重試的消息。
并發(fā)消費(fèi)的重試次數(shù)是可以修改的,重試次數(shù)對(duì)應(yīng)參數(shù)DefaultMQPushConsumer類的maxReconsumeTimes屬性,maxReconsumeTimes默認(rèn)是-1,也就是默認(rèn)會(huì)重試16次;
0代表不重試,只要失敗就會(huì)放入死信隊(duì)列;
1-16重試次數(shù)對(duì)應(yīng)著上面時(shí)間間隔表中對(duì)應(yīng)次數(shù)。
配置的最大重試次數(shù)超過16就按16處理。
并發(fā)消費(fèi)狀態(tài)
并發(fā)消費(fèi)有兩個(gè)狀態(tài)CONSUME_SUCCESS和RECONSUME_LATER。返回CONSUME_SUCCESS代表著消費(fèi)成功,返回RECONSUME_LATER代表進(jìn)行消息重試。
public enum ConsumeConcurrentlyStatus { /** * Success consumption */ CONSUME_SUCCESS, /** * Failure consumption,later try to consume */ RECONSUME_LATER; }
當(dāng)MessageListenerConcurrently接口的consumeMessage方法返回ConsumeConcurrentlyStatus#RECONSUME_LATER、null或者方法拋異常了,都會(huì)進(jìn)行消息重試。當(dāng)然還是推薦返回ConsumeConcurrentlyStatus#RECONSUME_LATER。
測(cè)試順序消費(fèi)
順序消費(fèi)和并行消費(fèi)其實(shí)都差不多的,只不過順序消費(fèi)實(shí)現(xiàn)的是MessageListenerOrderly 接口
- 創(chuàng)建順序消費(fèi)監(jiān)聽類
public class MessageListenerOrderlyImpl implements MessageListenerOrderly { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { if (CollectionUtils.isEmpty(msgs)) { return ConsumeOrderlyStatus.SUCCESS; } MessageExt message = msgs.get(0); try { final LocalDateTime now = LocalDateTime.now(); //逐條消費(fèi) String messageBody = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("當(dāng)前時(shí)間:"+now+", messageId: " + message.getMsgId() + ",topic: " + message.getTopic() + ",messageBody: " + messageBody); //模擬消費(fèi)失敗 if ("Orderly_test".equals(messageBody)) { int a = 1 / 0; } return ConsumeOrderlyStatus.SUCCESS; } catch (Exception e) { return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } }
- 注冊(cè)監(jiān)聽類
//最大重試次數(shù) consumer.setMaxReconsumeTimes(2); //順序消費(fèi) 重試時(shí)間間隔 consumer.setSuspendCurrentQueueTimeMillis(2000);
SuspendCurrentQueueTimeMillis表示重試的時(shí)間間隔,默認(rèn)是1s,這里修改成2s
- 測(cè)試
@RunWith(SpringRunner.class) @SpringBootTest(classes = RocketmqApplication.class) class RocketmqApplicationTests { @Value("${rocketmq.consumer.topic}") private String topic; @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void testProducer(){ String msg = "Orderly_test"; rocketMQTemplate.convertAndSend(topic , msg); } }
測(cè)試結(jié)果:
可以看到三條結(jié)果,第一條是第一次消費(fèi)的,其余兩條是隔了2s重試的。重試2次之后這條數(shù)據(jù)就進(jìn)入了死信隊(duì)列。
順序消費(fèi)狀態(tài)
順序消費(fèi)目前也是兩個(gè)狀態(tài):SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT。SUSPEND_CURRENT_QUEUE_A_MOMENT意思是先暫停消費(fèi)一下,過SuspendCurrentQueueTimeMillis時(shí)間間隔后再重試一下,而不是放到重試隊(duì)列里。
public enum ConsumeOrderlyStatus { /** * Success consumption */ SUCCESS, /** * Rollback consumption(only for binlog consumption) */ @Deprecated ROLLBACK, /** * Commit offset(only for binlog consumption) */ @Deprecated COMMIT, /** * Suspend current queue a moment */ SUSPEND_CURRENT_QUEUE_A_MOMENT; }
測(cè)試死信隊(duì)列
并發(fā)消費(fèi)和順序消費(fèi)達(dá)到了最大重試次數(shù)之后就會(huì)放到死信隊(duì)列。死信隊(duì)列在一開始是不會(huì)被創(chuàng)建的,只有需要的時(shí)候才會(huì)被創(chuàng)建。就拿上面測(cè)試結(jié)果來看,進(jìn)入到的死信隊(duì)列就是%DLQ%consumer_group
,進(jìn)入死信隊(duì)列的消息要收到處理。
死信隊(duì)列特性
- 不會(huì)再被消費(fèi)者正常消費(fèi)。
- 一個(gè)死信隊(duì)列對(duì)應(yīng)一個(gè)分組, 而不是對(duì)應(yīng)單個(gè)消費(fèi)者實(shí)例。
- 如果一個(gè)消費(fèi)者組未產(chǎn)生死信消息,消息隊(duì)列 RocketMQ 不會(huì)為其創(chuàng)建相應(yīng)的死信隊(duì)列。
- 一個(gè)死信隊(duì)列包含了對(duì)應(yīng) 分組產(chǎn)生的所有死信消息,不論該消息屬于哪個(gè) Topic。
- 有效期與正常消息相同,均為 3 天,3 天后會(huì)被自動(dòng)刪除。因此,請(qǐng)?jiān)谒佬畔a(chǎn)生后的 3 天內(nèi)及時(shí)處理
參考資料:
https://rocketmq.apache.org/docs/
以上就是RocketMq 消息重試機(jī)制及死信隊(duì)列詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMq 消息重試死信隊(duì)列的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java異常處理中同時(shí)有finally和return語句的執(zhí)行問題
這篇文章主要介紹了Java異常處理中同時(shí)有finally和return語句的執(zhí)行問題,首先確定的是一般finally語句都會(huì)被執(zhí)行...然后,需要的朋友可以參考下2015-11-11Spring Boot中的@ConfigurationProperties注解解讀
在SpringBoot框架中,@ConfigurationProperties注解是處理外部配置的強(qiáng)大工具,它允許開發(fā)者將配置文件中的屬性自動(dòng)映射到Java類的字段上,實(shí)現(xiàn)配置的集中管理和類型安全,通過定義配置類并指定前綴,可以將配置文件中的屬性綁定到Java對(duì)象2024-10-10深入了解Java中String、Char和Int之間的相互轉(zhuǎn)換
這篇文章主要介紹了深入了解Java中String、Char和Int之間的相互轉(zhuǎn)換,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,,需要的朋友可以參考下2019-06-06Java詳解實(shí)現(xiàn)ATM機(jī)模擬系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了如何利用Java語言實(shí)現(xiàn)控制臺(tái)版本的ATM銀行管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-06-06新手了解java 類,對(duì)象以及封裝基礎(chǔ)知識(shí)
JS是一門面向?qū)ο笳Z言,其對(duì)象是用prototype屬性來模擬的,本文介紹了如何封裝JS對(duì)象,具有一定的參考價(jià)值,下面跟著小編一起來看下吧,希望對(duì)你有所幫助2021-07-07Java基礎(chǔ)之刪除文本文件中特定行的內(nèi)容
這篇文章主要介紹了Java基礎(chǔ)之刪除文本文件中特定行的內(nèi)容,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java基礎(chǔ)的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-04-04