欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMq 消息重試機制及死信隊列詳解

 更新時間:2022年10月07日 11:04:28   作者:索碼理  
這篇文章主要為大家介紹了RocketMq 消息重試機制及死信隊列詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

生產(chǎn)者消息重試

消息隊列中的消息消費時并不能保證總是成功的,那失敗的消息該怎么進行消息補償呢?這就用到今天的主角消息重試和死信隊列了。

有時因為網(wǎng)路等原因生產(chǎn)者也可能發(fā)送消息失敗,也會進行消息重試,生產(chǎn)者消息重試比較簡單,在springboot中只要在配置文件中配置一下就可以了。

# 異步消息發(fā)送失敗重試次數(shù),默認為2
rocketmq.producer.retry-times-when-send-async-failed=2
# 消息發(fā)送失敗重試次數(shù),默認為2
rocketmq.producer.retry-times-when-send-failed=2

也可以通過下面這種方式配置

DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setRetryTimesWhenSendFailed(2);
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(2);

消費者消息重試

Apache RocketMQ 有兩種消費模式:集群消費模式和廣播消費模式。消息重試只針對集群消費模式生效;廣播消費模式不提供失敗重試特性,即消費失敗后,失敗消息不再重試,繼續(xù)消費新的消息。

同時RocketMq Push消費提供了兩種消費方式:并發(fā)消費和順序消費。

并發(fā)消費

在并發(fā)消費中,可能會有多個線程同時消費一個隊列的消息,因此即使發(fā)送端通過發(fā)送順序消息保證消息在同一個隊列中按照FIFO的順序,也無法保證消息實際被順序消費,所有并發(fā)消費也可以稱之為無序消費。

順序消費

順序消費是消息生產(chǎn)者發(fā)送過來的消息會遵循FIFO隊列的思想,先進先出有順序的消費消息。 對于順序消息,當消費者消費消息失敗后,消息隊列 RocketMQ 會自動不斷進行消息重試(每次間隔時間為 1 秒),這時,應(yīng)用會出現(xiàn)消息消費被阻塞的情況。因此,在使用順序消息時,務(wù)必保證應(yīng)用能夠及時監(jiān)控并處理消費失敗的情況,避免阻塞現(xiàn)象的發(fā)生。

并發(fā)消費和順序消費區(qū)別

順序消費和并發(fā)消費的重試機制并不相同,順序消費消費失敗后會先在客戶端本地重試直到最大重試次數(shù),這樣可以避免消費失敗的消息被跳過,消費下一條消息而打亂順序消費的順序,而并發(fā)消費消費失敗后會將消費失敗的消息重新投遞回服務(wù)端,再等待服務(wù)端重新投遞回來,在這期間會正常消費隊列后面的消息。

并發(fā)消費失敗后并不是投遞回原Topic,而是投遞到一個特殊Topic,其命名為%RETRY%ConsumerGroupName,集群模式下并發(fā)消費每一個ConsumerGroup會對應(yīng)一個特殊Topic,并會訂閱該Topic。

兩者參數(shù)差別如下

消費類型重試間隔最大重試次數(shù)
順序消費間隔時間可通過自定義設(shè)置,SuspendCurrentQueueTimeMillis最大重試次數(shù)可通過自定義參數(shù)MaxReconsumeTimes取值進行配置。該參數(shù)取值無最大限制。若未設(shè)置參數(shù)值,默認最大重試次數(shù)為Integer.MAX
并發(fā)消費間隔時間根據(jù)重試次數(shù)階梯變化,取值范圍:1秒~2小時。不支持自定義配置最大重試次數(shù)可通過自定義參數(shù)MaxReconsumeTimes取值進行配置。默認值為16次,該參數(shù)取值無最大限制,建議使用默認值

并發(fā)消費重試間隔如下:

第幾次重試與上次重試的間隔時間第幾次重試與上次重試的間隔時間
110s97min
230s108min
31min119min
42min1210min
53min1320min
64min1430min
75min151h
86min162h

死信隊列

當一條消息初次消費失敗,RocketMQ會自動進行消息重試,達到最大重試次數(shù)后,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息。此時,該消息不會立刻被丟棄,而是將其發(fā)送到該消費者對應(yīng)的特殊隊列中,這類消息稱為死信消息(Dead-Letter Message),存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue),死信隊列是死信Topic下分區(qū)數(shù)唯一的單獨隊列。如果產(chǎn)生了死信消息,那對應(yīng)的ConsumerGroup的死信Topic名稱為%DLQ%ConsumerGroupName,死信隊列的消息將不會再被消費。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查詢到對應(yīng)死信消息的信息。

實踐出真知

Talk is cheap,show you the code.

公共部分創(chuàng)建

  • 配置文件
rocketmq.name-server=localhost:9876
# 消費者組
rocketmq.producer.group=producer_group
rocketmq.consumer.topic=consumer_topic
rocketmq.consumer.group=consumer_group
  • 創(chuàng)建消費者RetryConsumerDemo
@Component
public class RetryConsumerDemo {
    @Value("${rocketmq.name-server}")
    private String namesrvAddr;
    @Value("${rocketmq.consumer.topic}")
    private String topic;
    @Value("${rocketmq.consumer.group}")
    private String consumerGroup;
    private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
    @PostConstruct
    public void start() {
        try {
            consumer.setNamesrvAddr(namesrvAddr);
            //設(shè)置集群消費模式
            consumer.setMessageModel(MessageModel.CLUSTERING);
            //設(shè)置消費超時時間(分鐘)
            consumer.setConsumeTimeout(1);
            //訂閱主題
            consumer.subscribe(topic , "*");
            //注冊消息監(jiān)聽器
            consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());
            //最大重試次數(shù)
            consumer.setMaxReconsumeTimes(2);
            //啟動消費端
            consumer.start();
            System.out.println("Retry Consumer Start...");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}

測試并發(fā)消費

  • 創(chuàng)建并發(fā)消費監(jiān)聽類 并發(fā)消費監(jiān)聽類要實現(xiàn)MessageListenerConcurrently類
public class MessageListenerConcurrentlyImpl implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        if (CollectionUtils.isEmpty(msgs)) {
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        MessageExt message = msgs.get(0);
        try {
            final LocalDateTime now = LocalDateTime.now();
            //逐條消費
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("當前時間:"+now+", messageId: " + message.getMsgId() + ",topic: " +
                    message.getTopic()  + ",messageBody: " + messageBody);
            //模擬消費失敗
            if ("Concurrently_test".equals(messageBody)) {
                int a = 1 / 0;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}
  • 注冊監(jiān)聽類 在消費者類RetryConsumerDemo中注冊監(jiān)聽類
//注冊消息監(jiān)聽器
consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());
  • 測試
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RocketmqApplication.class)
class RocketmqApplicationTests {
    @Value("${rocketmq.consumer.topic}")
    private String topic;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Test
    public void testProducer(){
        String msg = "Concurrently_test";
        rocketMQTemplate.convertAndSend(topic , msg);
    }
}

測試結(jié)果:

后面重試時間太長就不做測試了,可以看到并發(fā)消費的消息時間都是按照上面那張時間間隔表來。

然后通過RocketMq Dashboard Topic一欄可以看到有一個重試消費者組%RETRY%consumer_group,這個消費者組內(nèi)存放的就是consumer_group消費者組消費失敗重試的消息。

并發(fā)消費的重試次數(shù)是可以修改的,重試次數(shù)對應(yīng)參數(shù)DefaultMQPushConsumer類的maxReconsumeTimes屬性,maxReconsumeTimes默認是-1,也就是默認會重試16次;

0代表不重試,只要失敗就會放入死信隊列;

1-16重試次數(shù)對應(yīng)著上面時間間隔表中對應(yīng)次數(shù)。

配置的最大重試次數(shù)超過16就按16處理。

并發(fā)消費狀態(tài)

并發(fā)消費有兩個狀態(tài)CONSUME_SUCCESS和RECONSUME_LATER。返回CONSUME_SUCCESS代表著消費成功,返回RECONSUME_LATER代表進行消息重試。

public enum ConsumeConcurrentlyStatus {
    /**
     * Success consumption
     */
    CONSUME_SUCCESS,
    /**
     * Failure consumption,later try to consume
     */
    RECONSUME_LATER;
}

當MessageListenerConcurrently接口的consumeMessage方法返回ConsumeConcurrentlyStatus#RECONSUME_LATER、null或者方法拋異常了,都會進行消息重試。當然還是推薦返回ConsumeConcurrentlyStatus#RECONSUME_LATER。

測試順序消費

順序消費和并行消費其實都差不多的,只不過順序消費實現(xiàn)的是MessageListenerOrderly 接口

  • 創(chuàng)建順序消費監(jiān)聽類
public class MessageListenerOrderlyImpl implements MessageListenerOrderly {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        if (CollectionUtils.isEmpty(msgs)) {
            return ConsumeOrderlyStatus.SUCCESS;
        }
        MessageExt message = msgs.get(0);
        try {
            final LocalDateTime now = LocalDateTime.now();
            //逐條消費
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("當前時間:"+now+", messageId: " + message.getMsgId() + ",topic: " +
                    message.getTopic()  + ",messageBody: " + messageBody);
            //模擬消費失敗
            if ("Orderly_test".equals(messageBody)) {
                int a = 1 / 0;
            }
            return ConsumeOrderlyStatus.SUCCESS;
        } catch (Exception e) {
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
    }
}
  • 注冊監(jiān)聽類
//最大重試次數(shù)
consumer.setMaxReconsumeTimes(2);
//順序消費 重試時間間隔
consumer.setSuspendCurrentQueueTimeMillis(2000);

SuspendCurrentQueueTimeMillis表示重試的時間間隔,默認是1s,這里修改成2s

  • 測試
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RocketmqApplication.class)
class RocketmqApplicationTests {
    @Value("${rocketmq.consumer.topic}")
    private String topic;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Test
    public void testProducer(){
        String msg = "Orderly_test";
        rocketMQTemplate.convertAndSend(topic , msg);
    }
}

測試結(jié)果:

可以看到三條結(jié)果,第一條是第一次消費的,其余兩條是隔了2s重試的。重試2次之后這條數(shù)據(jù)就進入了死信隊列。

順序消費狀態(tài)

順序消費目前也是兩個狀態(tài):SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT。SUSPEND_CURRENT_QUEUE_A_MOMENT意思是先暫停消費一下,過SuspendCurrentQueueTimeMillis時間間隔后再重試一下,而不是放到重試隊列里。

public enum ConsumeOrderlyStatus {
    /**
     * Success consumption
     */
    SUCCESS,
    /**
     * Rollback consumption(only for binlog consumption)
     */
    @Deprecated
    ROLLBACK,
    /**
     * Commit offset(only for binlog consumption)
     */
    @Deprecated
    COMMIT,
    /**
     * Suspend current queue a moment
     */
    SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

測試死信隊列

并發(fā)消費和順序消費達到了最大重試次數(shù)之后就會放到死信隊列。死信隊列在一開始是不會被創(chuàng)建的,只有需要的時候才會被創(chuàng)建。就拿上面測試結(jié)果來看,進入到的死信隊列就是%DLQ%consumer_group,進入死信隊列的消息要收到處理。

死信隊列特性

  • 不會再被消費者正常消費。
  • 一個死信隊列對應(yīng)一個分組, 而不是對應(yīng)單個消費者實例。
  • 如果一個消費者組未產(chǎn)生死信消息,消息隊列 RocketMQ 不會為其創(chuàng)建相應(yīng)的死信隊列。
  • 一個死信隊列包含了對應(yīng) 分組產(chǎn)生的所有死信消息,不論該消息屬于哪個 Topic。
  • 有效期與正常消息相同,均為 3 天,3 天后會被自動刪除。因此,請在死信消息產(chǎn)生后的 3 天內(nèi)及時處理

參考資料:

https://rocketmq.apache.org/docs/

以上就是RocketMq 消息重試機制及死信隊列詳解的詳細內(nèi)容,更多關(guān)于RocketMq 消息重試死信隊列的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java異常處理中同時有finally和return語句的執(zhí)行問題

    Java異常處理中同時有finally和return語句的執(zhí)行問題

    這篇文章主要介紹了Java異常處理中同時有finally和return語句的執(zhí)行問題,首先確定的是一般finally語句都會被執(zhí)行...然后,需要的朋友可以參考下
    2015-11-11
  • Spring中的循環(huán)依賴問題

    Spring中的循環(huán)依賴問題

    在Spring框架中,循環(huán)依賴是指兩個或多個Bean相互依賴,這導致在Bean的創(chuàng)建過程中出現(xiàn)依賴死鎖,為了解決這一問題,Spring引入了三級緩存機制,包括singletonObjects、earlySingletonObjects和singletonFactories
    2024-09-09
  • Spring Boot中的@ConfigurationProperties注解解讀

    Spring Boot中的@ConfigurationProperties注解解讀

    在SpringBoot框架中,@ConfigurationProperties注解是處理外部配置的強大工具,它允許開發(fā)者將配置文件中的屬性自動映射到Java類的字段上,實現(xiàn)配置的集中管理和類型安全,通過定義配置類并指定前綴,可以將配置文件中的屬性綁定到Java對象
    2024-10-10
  • 深入了解Java中String、Char和Int之間的相互轉(zhuǎn)換

    深入了解Java中String、Char和Int之間的相互轉(zhuǎn)換

    這篇文章主要介紹了深入了解Java中String、Char和Int之間的相互轉(zhuǎn)換,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,,需要的朋友可以參考下
    2019-06-06
  • Java開發(fā)中常用記錄

    Java開發(fā)中常用記錄

    這篇文章主要介紹了Java-編程式事務(wù)、Java-Stream、Linux常用命令,需要的朋友可以參考下
    2023-05-05
  • Java詳解實現(xiàn)ATM機模擬系統(tǒng)

    Java詳解實現(xiàn)ATM機模擬系統(tǒng)

    這篇文章主要為大家詳細介紹了如何利用Java語言實現(xiàn)控制臺版本的ATM銀行管理系統(tǒng),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-06-06
  • 淺談log4j的rootLogger及其他坑爹的地方

    淺談log4j的rootLogger及其他坑爹的地方

    這篇文章主要介紹了log4j的rootLogger及其他坑爹的地方,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • 新手了解java 類,對象以及封裝基礎(chǔ)知識

    新手了解java 類,對象以及封裝基礎(chǔ)知識

    JS是一門面向?qū)ο笳Z言,其對象是用prototype屬性來模擬的,本文介紹了如何封裝JS對象,具有一定的參考價值,下面跟著小編一起來看下吧,希望對你有所幫助
    2021-07-07
  • Java實現(xiàn)寵物商店管理

    Java實現(xiàn)寵物商店管理

    這篇文章主要為大家詳細介紹了Java實現(xiàn)寵物商店管理,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2020-10-10
  • Java基礎(chǔ)之刪除文本文件中特定行的內(nèi)容

    Java基礎(chǔ)之刪除文本文件中特定行的內(nèi)容

    這篇文章主要介紹了Java基礎(chǔ)之刪除文本文件中特定行的內(nèi)容,文中有非常詳細的代碼示例,對正在學習java基礎(chǔ)的小伙伴們有非常好的幫助,需要的朋友可以參考下
    2021-04-04

最新評論