欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊列和死信隊列

 更新時間:2024年06月11日 10:05:32   作者:哈弟撩編程  
RabbitMQ的死信隊列用于接收其他隊列中的“死信”消息,所謂“死信”,是指滿足一定條件而無法被消費(fèi)者正確處理的消息,死信隊列通常與RabbitMQ的延遲隊列一起使用,本文給大家介紹了SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊列和死信隊列,需要的朋友可以參考下

一、死信隊列

RabbitMQ的死信隊列(Dead Letter Queue,DLQ)是一種特殊的隊列,用于接收其他隊列中的“死信”消息。所謂“死信”,是指滿足一定條件而無法被消費(fèi)者正確處理的消息,這些條件包括消息被拒絕、消息過期、消息達(dá)到最大重試次數(shù)等。

當(dāng)消息成為死信時,RabbitMQ會將其重新發(fā)送到指定的死信隊列,而不是丟棄它們。這樣做的好處是可以對死信進(jìn)行分析和處理,例如記錄日志、重新入隊或者進(jìn)一步處理。

死信隊列通常與RabbitMQ的延遲隊列(Delayed Message Queue)一起使用,通過延遲隊列延遲消息的處理時間,可以更容易地觸發(fā)消息成為死信的條件,從而進(jìn)行測試和調(diào)試。

死信隊列在消息中間件中有許多實(shí)際應(yīng)用場景,主要用于處理無法被正常消費(fèi)的消息,增強(qiáng)了消息的可靠性和處理能力。以下是一些常見的應(yīng)用場景:

延遲消息處理:通過將消息發(fā)送到延遲隊列,在指定的時間后再將消息發(fā)送到目標(biāo)隊列,實(shí)現(xiàn)延遲處理消息的功能。

消息重試:當(dāng)消費(fèi)者無法處理消息時,消息可以被重新發(fā)送到隊列并設(shè)置重試次數(shù),達(dá)到最大重試次數(shù)后轉(zhuǎn)發(fā)到死信隊列,以便進(jìn)行進(jìn)一步處理。

異常處理:當(dāng)消息無法被消費(fèi)者正常處理時(如格式錯誤、業(yè)務(wù)異常等),將消息轉(zhuǎn)發(fā)到死信隊列,用于記錄日志、報警或人工處理。

消息超時處理:當(dāng)消息在隊列中等待時間過長時,可以設(shè)置消息的過期時間(TTL),超過時間后將消息轉(zhuǎn)發(fā)到死信隊列。

消息路由失敗:當(dāng)消息無法被正確路由到目標(biāo)隊列時,可以將消息發(fā)送到死信隊列,避免消息丟失。

消息版本兼容性處理:當(dāng)消息的格式或內(nèi)容發(fā)生變化時,通過死信隊列可以處理老版本消息,確保新版本系統(tǒng)的兼容性。

RabbitMQ的工作模式

死信隊列的工作模式

今天我要實(shí)現(xiàn)的就是這個延遲隊列和死信隊列。生產(chǎn)者首先向延遲隊列發(fā)送消息,待達(dá)到TTL后消息會被轉(zhuǎn)送到死信隊列當(dāng)中,消費(fèi)者會從死信隊列中獲取消息進(jìn)行消費(fèi)。

二、RabbitMQ相關(guān)的安裝 

win10安裝rabbitMQ的詳細(xì)步驟_java_腳本之家 (jb51.net)

我這里直接引用別人的文章了,下載需要大家去看一看。

RabbitMQ延遲插件的安裝。

RabbitMQ安裝延遲消息插件的教程(超詳細(xì))_java_腳本之家 (jb51.net)

三、SpringBoot引入RabbitMQ

1.引入依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </dependency>

2.創(chuàng)建隊列和交換器

這一步是很重要的,如果你配置錯誤了,消息很可能無法正確的傳送。要實(shí)現(xiàn)延遲隊列和死信隊列,我們一共要創(chuàng)建以下幾個組件:

  • 延遲隊列
  • 延遲隊列的交換器
  • 死信隊列
  • 死信隊列的交換器

在我們創(chuàng)建了這幾個組件之后,我們還要干一些事情,我們需要把這些組件進(jìn)行組裝,如果你不了解RabbitMQ的基礎(chǔ),你可以先看看基礎(chǔ)教學(xué),我這里簡單的說一下。RabbitMQ中有一種綁定方式,這種綁定方式會把BindingKey和RoutingKey完全匹配的進(jìn)行綁定,如下圖所示,生產(chǎn)者發(fā)送了一個BindingKey為“warning”的消息,那么這個消息就會被發(fā)送到Queue1和Queue2,這并不難理解。

我們要做的就是把隊列和交換器通過一個RoutingKey綁定在一起。

2.1 變量聲明

接下來的代碼要好好看了,首先我們把我們后邊要用到的名稱變量全部定義出來。因?yàn)檫@個名稱起的很長,我們不方便直接使用。創(chuàng)建DeadRabbitConfig。在類中定義如下變量,延遲隊列交換器名稱、延遲隊列名稱、延遲隊列Routing名稱。除此之外還有死信隊列交換器名稱、死信隊列名稱和死信Routing名稱。

  // 延遲隊列交換器名稱
    public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";
    // 延遲隊列A名稱
    public static final String DELAY_QUEUE_A_NAME = "delay.queue.demo.business.queue_a";
    // 延遲隊列B名稱
    public static final String DELAY_QUEUE_B_NAME = "delay.queue.demo.business.queue_b";
    // 延遲隊列routingA名稱
    public static final String DELAY_QUEUE_ROUTING_A_NAME = "delay.queue.demo.business.queue_a.routing_key";
    // 延遲隊列routingB名稱
    public static final String DELAY_QUEUE_ROUTING_B_NAME = "delay.queue.demo.business.queue_b.routing_key";
 
    // 死信隊列
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routing_key";
    public static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routing_key";
    public static final String DEAD_LETTER_QUEUE_A_NAME = "delay.queue.demo.deadletter.queue_a";
    public static final String DEAD_LETTER_QUEUE_B_NAME = "delay.queue.demo.deadletter.queue_b";

2.2 創(chuàng)建延遲交換器

// 注冊延遲交換器delayExchange
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return  new DirectExchange(DELAY_EXCHANGE_NAME);
    }

2.3 創(chuàng)建延遲隊列

這里的延遲隊列需要我們額外的配置一些參數(shù),用于和死信隊列進(jìn)行信息發(fā)送。這里我是用了兩種不同的方式構(gòu)建延遲隊列A和延遲隊列B,在延遲隊列A種我沒有設(shè)置TTL參數(shù),而是通過RabbitMQ的延遲插件實(shí)現(xiàn)的,而延遲隊列B我設(shè)置了TTL為10000ms,也就是十秒,十秒內(nèi)消息如果沒有被消費(fèi)掉就會發(fā)送到死信隊列。
// 注冊延遲隊列A   還要綁定死信交換器和死信routingA
    @Bean("delayQueueA")
    public Queue delayQueueA(){
        Map<String,Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_A_ROUTING_KEY);
        //args.put("x-message-ttl",6000);
        return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(args).build();
    }
    // 注冊延遲隊列B   還要綁定死信交換器和死信routingB
    @Bean("delayQueueB")
    public Queue delayQueueB(){
        Map<String,Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_B_ROUTING_KEY);
        args.put("x-message-ttl",10000);
        return QueueBuilder.durable(DELAY_QUEUE_B_NAME).withArguments(args).build();
    }

2.4 延遲隊列綁定延遲交換器

 // 延遲隊列A綁定交換器
    @Bean
    public Binding delayQueueABinding(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange delayExchange){
        return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_A_NAME);
    }
    // 延遲隊列B綁定交換器
    @Bean
    public Binding delayQueueBBinding(@Qualifier("delayQueueB") Queue queue,@Qualifier("delayExchange") DirectExchange delayExchange){
        return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_B_NAME);
    }

2.5 死信隊列配置

與延遲隊列不同的是,死信隊列并沒有配置延遲參數(shù)。

// 注冊死信隊列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUE_A_NAME);
    }
    // 注冊死信隊列B
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUE_B_NAME);
    }
    // 注冊死信交換器
    @Bean
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }
    // 死信隊列A綁定死信交換器
    @Bean
    public Binding deadLetterQueueABinding(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange deadLetterExchange){
        return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY);
    }
    // 死信隊列B綁定死信交換器
    @Bean
    public Binding deadLetterQueueBBinding(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange")DirectExchange deadLetterExchange){
        return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY);
    }

到此為止,RabbitMQ的組件配置完成。

3. 添加application.yml

server:
  port: 8081
spring:
  application:
    name: test-rabbitmq-producer
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

4. 添加RabbitMQListener (消費(fèi)者)

下方的代碼一共有兩個消費(fèi)者,一個消費(fèi)者獲取死信隊列A中的消息,另一個消費(fèi)者獲取死信隊列B中的消息。

@Component
public class DeadLetterQueueConsumer {
    public static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterQueueConsumer.class);
 
    @RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_A_NAME,ackMode = "MANUAL")
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        LOGGER.info("當(dāng)前時間:{},死信隊列A收到消息:{}", new Date().toString(), msg);
        System.out.println(message.getMessageProperties().getDeliveryTag());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
 
    @RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_B_NAME,ackMode = "MANUAL")
    public void receiveB(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        LOGGER.info("當(dāng)前時間:{},死信隊列B收到消息:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

5. 創(chuàng)建DelayMessageSender

這里采用的就是兩種不同的方式,一種方式是使用插件來延遲消息的發(fā)送,另一種是通過TTL參數(shù)。

@Component
public class DelayMessageSender {
    @Resource
    RabbitTemplate rabbitTemplate;
 
 
    public void sendMessage(String msg,Integer delayTimes){
        switch (delayTimes){
            case 6:
                rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_EXCHANGE_NAME, DeadRabbitConfig.DELAY_QUEUE_ROUTING_A_NAME,msg,new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setExpiration(String.valueOf(6000));
                        return message;
                    }
                });
                break;
            case 10:
                rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_QUEUE_B_NAME,msg);
                break;
        }
    }
}

6. 創(chuàng)建Controller 

@RestController
@RequestMapping("/student")
public class StudentController {
    @Autowired
    DelayMessageSender messageSender;
    @RequestMapping("/send-message")
    public String sendMessage(String msg,Integer delayTimes){
        System.out.println(new Date());
        messageSender.sendMessage(msg,delayTimes);
        return "發(fā)送成功";
    }
}

7.測試

在瀏覽器中輸入以下地址進(jìn)入RabbitMQ界面。賬號密碼都是guest。

http://localhost:15672/

先來看看我們的初始隊列。這里是什么都沒有的。

然后我們啟動項(xiàng)目后在看。我們剛才創(chuàng)建出來的四個隊列全部都被加載了出來。

使用PostMan發(fā)送一次請求。

我們的請求在17s的時候發(fā)送到后端,消息打印在23s,說明我們的延遲隊列有效果。

接下來我們測試10s的延遲隊列。

10s后死信隊列B成功的接收到了消息。

四、死信隊列的應(yīng)用場景

延遲隊列通常用于需要延遲執(zhí)行某些任務(wù)或觸發(fā)某些事件的場景。例如,在電子商務(wù)中,可以使用延遲隊列實(shí)現(xiàn)訂單超時未支付自動取消功能。

1.訂單創(chuàng)建

用戶下單后,系統(tǒng)生成訂單,并將訂單信息發(fā)送到一個普通隊列,同時設(shè)置一個TTL(Time-To-Live)為30分鐘。這個隊列配置了死信交換機(jī)(Dead Letter Exchange, DLX),當(dāng)消息過期后會被轉(zhuǎn)發(fā)到死信隊列。

2.等待支付

在30分鐘內(nèi),用戶可以完成支付。如果用戶在30分鐘內(nèi)支付完成,系統(tǒng)會從普通隊列中移除對應(yīng)的消息并正常處理訂單。

3.訂單超時處理

如果用戶未在30分鐘內(nèi)完成支付,消息會自動過期并轉(zhuǎn)發(fā)到死信交換機(jī),進(jìn)而轉(zhuǎn)發(fā)到死信隊列。

4.取消訂單

系統(tǒng)有一個專門的消費(fèi)者監(jiān)聽死信隊列。當(dāng)有消息進(jìn)入死信隊列時,消費(fèi)者會自動處理這些消息,即取消訂單、釋放庫存,并通知用戶訂單已取消。

5.定時任務(wù)(可選):

雖然死信隊列已經(jīng)提供了超時訂單的處理,但為了防止消息丟失或處理延遲,可以設(shè)置一個定時任務(wù)定期檢查訂單狀態(tài),確保所有超時未支付的訂單都得到了處理。

以上就是SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊列和死信隊列的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot RabbitMQ隊列的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 示例解析java面向?qū)ο缶幊谭庋b與訪問控制

    示例解析java面向?qū)ο缶幊谭庋b與訪問控制

    這篇文章主要為大家介紹了java封裝與訪問控制的示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-05-05
  • JUnit單元測試入門必看篇

    JUnit單元測試入門必看篇

    下面小編就為大家分享一篇JUnit單元測試入門必看篇,對新手而言有很好的參考價值,希望對大家有所幫助
    2017-11-11
  • 淺談Maven環(huán)境隔離應(yīng)用

    淺談Maven環(huán)境隔離應(yīng)用

    這篇文章主要介紹了淺談Maven環(huán)境隔離應(yīng)用,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-09-09
  • java基礎(chǔ)之泛型知識點(diǎn)總結(jié)

    java基礎(chǔ)之泛型知識點(diǎn)總結(jié)

    這篇文章主要介紹了java基礎(chǔ)之泛型知識點(diǎn)總結(jié),文中有非常詳細(xì)的代碼示例,對正在學(xué)習(xí)java基礎(chǔ)的小伙伴們有很好的幫助,需要的朋友可以參考下
    2021-04-04
  • Java實(shí)現(xiàn)手機(jī)號碼歸屬地查詢

    Java實(shí)現(xiàn)手機(jī)號碼歸屬地查詢

    這篇文章主要為大家詳細(xì)介紹了如何利用Java實(shí)現(xiàn)手機(jī)號碼歸屬地查詢功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2024-12-12
  • 如何把spring boot應(yīng)用發(fā)布到Harbor

    如何把spring boot應(yīng)用發(fā)布到Harbor

    這篇文章主要介紹了如何把spring boot應(yīng)用發(fā)布到Harbor,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-11-11
  • SpringBoot中的聲明式事務(wù)詳解

    SpringBoot中的聲明式事務(wù)詳解

    這篇文章主要介紹了SpringBoot中的聲明式事務(wù)詳解,Spring采用統(tǒng)一的機(jī)制來處理不同的數(shù)據(jù)訪問技術(shù)的事務(wù), Spring的事務(wù)提供一個PlatformTransactionManager的接口,不同的數(shù)據(jù)訪問技術(shù)使用不同的接口實(shí)現(xiàn),需要的朋友可以參考下
    2023-08-08
  • Java中byte[]、String、Hex字符串等轉(zhuǎn)換的方法

    Java中byte[]、String、Hex字符串等轉(zhuǎn)換的方法

    這篇文章主要介紹了Java中byte[]、String、Hex字符串等轉(zhuǎn)換的方法,代碼很簡單,需要的朋友可以參考下
    2018-05-05
  • 使用Java實(shí)現(xiàn)價格加密與優(yōu)化功能

    使用Java實(shí)現(xiàn)價格加密與優(yōu)化功能

    在現(xiàn)代軟件開發(fā)中,數(shù)據(jù)加密是一個非常重要的環(huán)節(jié),尤其是在處理敏感信息(如價格、用戶數(shù)據(jù)等)時,本文將詳細(xì)介紹如何使用?Java?實(shí)現(xiàn)價格加密,并對代碼進(jìn)行優(yōu)化,需要的朋友可以參考下
    2025-01-01
  • Springboot整合分頁插件PageHelper步驟解析

    Springboot整合分頁插件PageHelper步驟解析

    這篇文章主要介紹了Springboot整合分頁插件PageHelper步驟解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-06-06

最新評論