RocketMq 消息重試機制及死信隊列詳解
生產(chǎn)者消息重試
消息隊列中的消息消費時并不能保證總是成功的,那失敗的消息該怎么進行消息補償呢?這就用到今天的主角消息重試和死信隊列了。
有時因為網(wǎng)路等原因生產(chǎn)者也可能發(fā)送消息失敗,也會進行消息重試,生產(chǎn)者消息重試比較簡單,在springboot中只要在配置文件中配置一下就可以了。
# 異步消息發(fā)送失敗重試次數(shù),默認為2 rocketmq.producer.retry-times-when-send-async-failed=2 # 消息發(fā)送失敗重試次數(shù),默認為2 rocketmq.producer.retry-times-when-send-failed=2
也可以通過下面這種方式配置
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(); defaultMQProducer.setRetryTimesWhenSendFailed(2); defaultMQProducer.setRetryTimesWhenSendAsyncFailed(2);
消費者消息重試
Apache RocketMQ 有兩種消費模式:集群消費模式和廣播消費模式。消息重試只針對集群消費模式生效;廣播消費模式不提供失敗重試特性,即消費失敗后,失敗消息不再重試,繼續(xù)消費新的消息。
同時RocketMq Push消費提供了兩種消費方式:并發(fā)消費和順序消費。
并發(fā)消費
在并發(fā)消費中,可能會有多個線程同時消費一個隊列的消息,因此即使發(fā)送端通過發(fā)送順序消息保證消息在同一個隊列中按照FIFO的順序,也無法保證消息實際被順序消費,所有并發(fā)消費也可以稱之為無序消費。
順序消費
順序消費是消息生產(chǎn)者發(fā)送過來的消息會遵循FIFO隊列的思想,先進先出有順序的消費消息。 對于順序消息,當消費者消費消息失敗后,消息隊列 RocketMQ 會自動不斷進行消息重試(每次間隔時間為 1 秒),這時,應(yīng)用會出現(xiàn)消息消費被阻塞的情況。因此,在使用順序消息時,務(wù)必保證應(yīng)用能夠及時監(jiān)控并處理消費失敗的情況,避免阻塞現(xiàn)象的發(fā)生。
并發(fā)消費和順序消費區(qū)別
順序消費和并發(fā)消費的重試機制并不相同,順序消費消費失敗后會先在客戶端本地重試直到最大重試次數(shù),這樣可以避免消費失敗的消息被跳過,消費下一條消息而打亂順序消費的順序,而并發(fā)消費消費失敗后會將消費失敗的消息重新投遞回服務(wù)端,再等待服務(wù)端重新投遞回來,在這期間會正常消費隊列后面的消息。
并發(fā)消費失敗后并不是投遞回原Topic,而是投遞到一個特殊Topic,其命名為%RETRY%ConsumerGroupName,集群模式下并發(fā)消費每一個ConsumerGroup會對應(yīng)一個特殊Topic,并會訂閱該Topic。
兩者參數(shù)差別如下
消費類型 | 重試間隔 | 最大重試次數(shù) |
---|---|---|
順序消費 | 間隔時間可通過自定義設(shè)置,SuspendCurrentQueueTimeMillis | 最大重試次數(shù)可通過自定義參數(shù)MaxReconsumeTimes取值進行配置。該參數(shù)取值無最大限制。若未設(shè)置參數(shù)值,默認最大重試次數(shù)為Integer.MAX |
并發(fā)消費 | 間隔時間根據(jù)重試次數(shù)階梯變化,取值范圍:1秒~2小時。不支持自定義配置 | 最大重試次數(shù)可通過自定義參數(shù)MaxReconsumeTimes取值進行配置。默認值為16次,該參數(shù)取值無最大限制,建議使用默認值 |
并發(fā)消費重試間隔如下:
第幾次重試 | 與上次重試的間隔時間 | 第幾次重試 | 與上次重試的間隔時間 |
---|---|---|---|
1 | 10s | 9 | 7min |
2 | 30s | 10 | 8min |
3 | 1min | 11 | 9min |
4 | 2min | 12 | 10min |
5 | 3min | 13 | 20min |
6 | 4min | 14 | 30min |
7 | 5min | 15 | 1h |
8 | 6min | 16 | 2h |
死信隊列
當一條消息初次消費失敗,RocketMQ會自動進行消息重試,達到最大重試次數(shù)后,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息。此時,該消息不會立刻被丟棄,而是將其發(fā)送到該消費者對應(yīng)的特殊隊列中,這類消息稱為死信消息(Dead-Letter Message),存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue),死信隊列是死信Topic下分區(qū)數(shù)唯一的單獨隊列。如果產(chǎn)生了死信消息,那對應(yīng)的ConsumerGroup的死信Topic名稱為%DLQ%ConsumerGroupName,死信隊列的消息將不會再被消費。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查詢到對應(yīng)死信消息的信息。
實踐出真知
Talk is cheap,show you the code.
公共部分創(chuàng)建
- 配置文件
rocketmq.name-server=localhost:9876 # 消費者組 rocketmq.producer.group=producer_group rocketmq.consumer.topic=consumer_topic rocketmq.consumer.group=consumer_group
- 創(chuàng)建消費者RetryConsumerDemo
@Component public class RetryConsumerDemo { @Value("${rocketmq.name-server}") private String namesrvAddr; @Value("${rocketmq.consumer.topic}") private String topic; @Value("${rocketmq.consumer.group}") private String consumerGroup; private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group"); @PostConstruct public void start() { try { consumer.setNamesrvAddr(namesrvAddr); //設(shè)置集群消費模式 consumer.setMessageModel(MessageModel.CLUSTERING); //設(shè)置消費超時時間(分鐘) consumer.setConsumeTimeout(1); //訂閱主題 consumer.subscribe(topic , "*"); //注冊消息監(jiān)聽器 consumer.registerMessageListener(new MessageListenerConcurrentlyImpl()); //最大重試次數(shù) consumer.setMaxReconsumeTimes(2); //啟動消費端 consumer.start(); System.out.println("Retry Consumer Start..."); } catch (MQClientException e) { e.printStackTrace(); } } }
測試并發(fā)消費
- 創(chuàng)建并發(fā)消費監(jiān)聽類 并發(fā)消費監(jiān)聽類要實現(xiàn)MessageListenerConcurrently類
public class MessageListenerConcurrentlyImpl implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { if (CollectionUtils.isEmpty(msgs)) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } MessageExt message = msgs.get(0); try { final LocalDateTime now = LocalDateTime.now(); //逐條消費 String messageBody = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("當前時間:"+now+", messageId: " + message.getMsgId() + ",topic: " + message.getTopic() + ",messageBody: " + messageBody); //模擬消費失敗 if ("Concurrently_test".equals(messageBody)) { int a = 1 / 0; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }
- 注冊監(jiān)聽類 在消費者類RetryConsumerDemo中注冊監(jiān)聽類
//注冊消息監(jiān)聽器 consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());
- 測試
@RunWith(SpringRunner.class) @SpringBootTest(classes = RocketmqApplication.class) class RocketmqApplicationTests { @Value("${rocketmq.consumer.topic}") private String topic; @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void testProducer(){ String msg = "Concurrently_test"; rocketMQTemplate.convertAndSend(topic , msg); } }
測試結(jié)果:
后面重試時間太長就不做測試了,可以看到并發(fā)消費的消息時間都是按照上面那張時間間隔表來。
然后通過RocketMq Dashboard Topic一欄可以看到有一個重試消費者組%RETRY%consumer_group
,這個消費者組內(nèi)存放的就是consumer_group消費者組消費失敗重試的消息。
并發(fā)消費的重試次數(shù)是可以修改的,重試次數(shù)對應(yīng)參數(shù)DefaultMQPushConsumer類的maxReconsumeTimes屬性,maxReconsumeTimes默認是-1,也就是默認會重試16次;
0代表不重試,只要失敗就會放入死信隊列;
1-16重試次數(shù)對應(yīng)著上面時間間隔表中對應(yīng)次數(shù)。
配置的最大重試次數(shù)超過16就按16處理。
并發(fā)消費狀態(tài)
并發(fā)消費有兩個狀態(tài)CONSUME_SUCCESS和RECONSUME_LATER。返回CONSUME_SUCCESS代表著消費成功,返回RECONSUME_LATER代表進行消息重試。
public enum ConsumeConcurrentlyStatus { /** * Success consumption */ CONSUME_SUCCESS, /** * Failure consumption,later try to consume */ RECONSUME_LATER; }
當MessageListenerConcurrently接口的consumeMessage方法返回ConsumeConcurrentlyStatus#RECONSUME_LATER、null或者方法拋異常了,都會進行消息重試。當然還是推薦返回ConsumeConcurrentlyStatus#RECONSUME_LATER。
測試順序消費
順序消費和并行消費其實都差不多的,只不過順序消費實現(xiàn)的是MessageListenerOrderly 接口
- 創(chuàng)建順序消費監(jiān)聽類
public class MessageListenerOrderlyImpl implements MessageListenerOrderly { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { if (CollectionUtils.isEmpty(msgs)) { return ConsumeOrderlyStatus.SUCCESS; } MessageExt message = msgs.get(0); try { final LocalDateTime now = LocalDateTime.now(); //逐條消費 String messageBody = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("當前時間:"+now+", messageId: " + message.getMsgId() + ",topic: " + message.getTopic() + ",messageBody: " + messageBody); //模擬消費失敗 if ("Orderly_test".equals(messageBody)) { int a = 1 / 0; } return ConsumeOrderlyStatus.SUCCESS; } catch (Exception e) { return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } }
- 注冊監(jiān)聽類
//最大重試次數(shù) consumer.setMaxReconsumeTimes(2); //順序消費 重試時間間隔 consumer.setSuspendCurrentQueueTimeMillis(2000);
SuspendCurrentQueueTimeMillis表示重試的時間間隔,默認是1s,這里修改成2s
- 測試
@RunWith(SpringRunner.class) @SpringBootTest(classes = RocketmqApplication.class) class RocketmqApplicationTests { @Value("${rocketmq.consumer.topic}") private String topic; @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void testProducer(){ String msg = "Orderly_test"; rocketMQTemplate.convertAndSend(topic , msg); } }
測試結(jié)果:
可以看到三條結(jié)果,第一條是第一次消費的,其余兩條是隔了2s重試的。重試2次之后這條數(shù)據(jù)就進入了死信隊列。
順序消費狀態(tài)
順序消費目前也是兩個狀態(tài):SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT。SUSPEND_CURRENT_QUEUE_A_MOMENT意思是先暫停消費一下,過SuspendCurrentQueueTimeMillis時間間隔后再重試一下,而不是放到重試隊列里。
public enum ConsumeOrderlyStatus { /** * Success consumption */ SUCCESS, /** * Rollback consumption(only for binlog consumption) */ @Deprecated ROLLBACK, /** * Commit offset(only for binlog consumption) */ @Deprecated COMMIT, /** * Suspend current queue a moment */ SUSPEND_CURRENT_QUEUE_A_MOMENT; }
測試死信隊列
并發(fā)消費和順序消費達到了最大重試次數(shù)之后就會放到死信隊列。死信隊列在一開始是不會被創(chuàng)建的,只有需要的時候才會被創(chuàng)建。就拿上面測試結(jié)果來看,進入到的死信隊列就是%DLQ%consumer_group
,進入死信隊列的消息要收到處理。
死信隊列特性
- 不會再被消費者正常消費。
- 一個死信隊列對應(yīng)一個分組, 而不是對應(yīng)單個消費者實例。
- 如果一個消費者組未產(chǎn)生死信消息,消息隊列 RocketMQ 不會為其創(chuàng)建相應(yīng)的死信隊列。
- 一個死信隊列包含了對應(yīng) 分組產(chǎn)生的所有死信消息,不論該消息屬于哪個 Topic。
- 有效期與正常消息相同,均為 3 天,3 天后會被自動刪除。因此,請在死信消息產(chǎn)生后的 3 天內(nèi)及時處理
參考資料:
https://rocketmq.apache.org/docs/
以上就是RocketMq 消息重試機制及死信隊列詳解的詳細內(nèi)容,更多關(guān)于RocketMq 消息重試死信隊列的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java異常處理中同時有finally和return語句的執(zhí)行問題
這篇文章主要介紹了Java異常處理中同時有finally和return語句的執(zhí)行問題,首先確定的是一般finally語句都會被執(zhí)行...然后,需要的朋友可以參考下2015-11-11Spring Boot中的@ConfigurationProperties注解解讀
在SpringBoot框架中,@ConfigurationProperties注解是處理外部配置的強大工具,它允許開發(fā)者將配置文件中的屬性自動映射到Java類的字段上,實現(xiàn)配置的集中管理和類型安全,通過定義配置類并指定前綴,可以將配置文件中的屬性綁定到Java對象2024-10-10深入了解Java中String、Char和Int之間的相互轉(zhuǎn)換
這篇文章主要介紹了深入了解Java中String、Char和Int之間的相互轉(zhuǎn)換,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,,需要的朋友可以參考下2019-06-06Java基礎(chǔ)之刪除文本文件中特定行的內(nèi)容
這篇文章主要介紹了Java基礎(chǔ)之刪除文本文件中特定行的內(nèi)容,文中有非常詳細的代碼示例,對正在學習java基礎(chǔ)的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-04-04