欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列和死信隊(duì)列

 更新時(shí)間:2024年06月11日 10:05:32   作者:哈弟撩編程  
RabbitMQ的死信隊(duì)列用于接收其他隊(duì)列中的“死信”消息,所謂“死信”,是指滿足一定條件而無法被消費(fèi)者正確處理的消息,死信隊(duì)列通常與RabbitMQ的延遲隊(duì)列一起使用,本文給大家介紹了SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列和死信隊(duì)列,需要的朋友可以參考下

一、死信隊(duì)列

RabbitMQ的死信隊(duì)列(Dead Letter Queue,DLQ)是一種特殊的隊(duì)列,用于接收其他隊(duì)列中的“死信”消息。所謂“死信”,是指滿足一定條件而無法被消費(fèi)者正確處理的消息,這些條件包括消息被拒絕、消息過期、消息達(dá)到最大重試次數(shù)等。

當(dāng)消息成為死信時(shí),RabbitMQ會將其重新發(fā)送到指定的死信隊(duì)列,而不是丟棄它們。這樣做的好處是可以對死信進(jìn)行分析和處理,例如記錄日志、重新入隊(duì)或者進(jìn)一步處理。

死信隊(duì)列通常與RabbitMQ的延遲隊(duì)列(Delayed Message Queue)一起使用,通過延遲隊(duì)列延遲消息的處理時(shí)間,可以更容易地觸發(fā)消息成為死信的條件,從而進(jìn)行測試和調(diào)試。

死信隊(duì)列在消息中間件中有許多實(shí)際應(yīng)用場景,主要用于處理無法被正常消費(fèi)的消息,增強(qiáng)了消息的可靠性和處理能力。以下是一些常見的應(yīng)用場景:

延遲消息處理:通過將消息發(fā)送到延遲隊(duì)列,在指定的時(shí)間后再將消息發(fā)送到目標(biāo)隊(duì)列,實(shí)現(xiàn)延遲處理消息的功能。

消息重試:當(dāng)消費(fèi)者無法處理消息時(shí),消息可以被重新發(fā)送到隊(duì)列并設(shè)置重試次數(shù),達(dá)到最大重試次數(shù)后轉(zhuǎn)發(fā)到死信隊(duì)列,以便進(jìn)行進(jìn)一步處理。

異常處理:當(dāng)消息無法被消費(fèi)者正常處理時(shí)(如格式錯(cuò)誤、業(yè)務(wù)異常等),將消息轉(zhuǎn)發(fā)到死信隊(duì)列,用于記錄日志、報(bào)警或人工處理。

消息超時(shí)處理:當(dāng)消息在隊(duì)列中等待時(shí)間過長時(shí),可以設(shè)置消息的過期時(shí)間(TTL),超過時(shí)間后將消息轉(zhuǎn)發(fā)到死信隊(duì)列。

消息路由失敗:當(dāng)消息無法被正確路由到目標(biāo)隊(duì)列時(shí),可以將消息發(fā)送到死信隊(duì)列,避免消息丟失。

消息版本兼容性處理:當(dāng)消息的格式或內(nèi)容發(fā)生變化時(shí),通過死信隊(duì)列可以處理老版本消息,確保新版本系統(tǒng)的兼容性。

RabbitMQ的工作模式

死信隊(duì)列的工作模式

今天我要實(shí)現(xiàn)的就是這個(gè)延遲隊(duì)列和死信隊(duì)列。生產(chǎn)者首先向延遲隊(duì)列發(fā)送消息,待達(dá)到TTL后消息會被轉(zhuǎn)送到死信隊(duì)列當(dāng)中,消費(fèi)者會從死信隊(duì)列中獲取消息進(jìn)行消費(fèi)。

二、RabbitMQ相關(guān)的安裝 

win10安裝rabbitMQ的詳細(xì)步驟_java_腳本之家 (jb51.net)

我這里直接引用別人的文章了,下載需要大家去看一看。

RabbitMQ延遲插件的安裝。

RabbitMQ安裝延遲消息插件的教程(超詳細(xì))_java_腳本之家 (jb51.net)

三、SpringBoot引入RabbitMQ

1.引入依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </dependency>

2.創(chuàng)建隊(duì)列和交換器

這一步是很重要的,如果你配置錯(cuò)誤了,消息很可能無法正確的傳送。要實(shí)現(xiàn)延遲隊(duì)列和死信隊(duì)列,我們一共要?jiǎng)?chuàng)建以下幾個(gè)組件:

  • 延遲隊(duì)列
  • 延遲隊(duì)列的交換器
  • 死信隊(duì)列
  • 死信隊(duì)列的交換器

在我們創(chuàng)建了這幾個(gè)組件之后,我們還要干一些事情,我們需要把這些組件進(jìn)行組裝,如果你不了解RabbitMQ的基礎(chǔ),你可以先看看基礎(chǔ)教學(xué),我這里簡單的說一下。RabbitMQ中有一種綁定方式,這種綁定方式會把BindingKey和RoutingKey完全匹配的進(jìn)行綁定,如下圖所示,生產(chǎn)者發(fā)送了一個(gè)BindingKey為“warning”的消息,那么這個(gè)消息就會被發(fā)送到Queue1和Queue2,這并不難理解。

我們要做的就是把隊(duì)列和交換器通過一個(gè)RoutingKey綁定在一起。

2.1 變量聲明

接下來的代碼要好好看了,首先我們把我們后邊要用到的名稱變量全部定義出來。因?yàn)檫@個(gè)名稱起的很長,我們不方便直接使用。創(chuàng)建DeadRabbitConfig。在類中定義如下變量,延遲隊(duì)列交換器名稱、延遲隊(duì)列名稱、延遲隊(duì)列Routing名稱。除此之外還有死信隊(duì)列交換器名稱、死信隊(duì)列名稱和死信Routing名稱。

  // 延遲隊(duì)列交換器名稱
    public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";
    // 延遲隊(duì)列A名稱
    public static final String DELAY_QUEUE_A_NAME = "delay.queue.demo.business.queue_a";
    // 延遲隊(duì)列B名稱
    public static final String DELAY_QUEUE_B_NAME = "delay.queue.demo.business.queue_b";
    // 延遲隊(duì)列routingA名稱
    public static final String DELAY_QUEUE_ROUTING_A_NAME = "delay.queue.demo.business.queue_a.routing_key";
    // 延遲隊(duì)列routingB名稱
    public static final String DELAY_QUEUE_ROUTING_B_NAME = "delay.queue.demo.business.queue_b.routing_key";
 
    // 死信隊(duì)列
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routing_key";
    public static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routing_key";
    public static final String DEAD_LETTER_QUEUE_A_NAME = "delay.queue.demo.deadletter.queue_a";
    public static final String DEAD_LETTER_QUEUE_B_NAME = "delay.queue.demo.deadletter.queue_b";

2.2 創(chuàng)建延遲交換器

// 注冊延遲交換器delayExchange
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return  new DirectExchange(DELAY_EXCHANGE_NAME);
    }

2.3 創(chuàng)建延遲隊(duì)列

這里的延遲隊(duì)列需要我們額外的配置一些參數(shù),用于和死信隊(duì)列進(jìn)行信息發(fā)送。這里我是用了兩種不同的方式構(gòu)建延遲隊(duì)列A和延遲隊(duì)列B,在延遲隊(duì)列A種我沒有設(shè)置TTL參數(shù),而是通過RabbitMQ的延遲插件實(shí)現(xiàn)的,而延遲隊(duì)列B我設(shè)置了TTL為10000ms,也就是十秒,十秒內(nèi)消息如果沒有被消費(fèi)掉就會發(fā)送到死信隊(duì)列。
// 注冊延遲隊(duì)列A   還要綁定死信交換器和死信routingA
    @Bean("delayQueueA")
    public Queue delayQueueA(){
        Map<String,Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_A_ROUTING_KEY);
        //args.put("x-message-ttl",6000);
        return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(args).build();
    }
    // 注冊延遲隊(duì)列B   還要綁定死信交換器和死信routingB
    @Bean("delayQueueB")
    public Queue delayQueueB(){
        Map<String,Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_B_ROUTING_KEY);
        args.put("x-message-ttl",10000);
        return QueueBuilder.durable(DELAY_QUEUE_B_NAME).withArguments(args).build();
    }

2.4 延遲隊(duì)列綁定延遲交換器

 // 延遲隊(duì)列A綁定交換器
    @Bean
    public Binding delayQueueABinding(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange delayExchange){
        return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_A_NAME);
    }
    // 延遲隊(duì)列B綁定交換器
    @Bean
    public Binding delayQueueBBinding(@Qualifier("delayQueueB") Queue queue,@Qualifier("delayExchange") DirectExchange delayExchange){
        return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_B_NAME);
    }

2.5 死信隊(duì)列配置

與延遲隊(duì)列不同的是,死信隊(duì)列并沒有配置延遲參數(shù)。

// 注冊死信隊(duì)列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUE_A_NAME);
    }
    // 注冊死信隊(duì)列B
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUE_B_NAME);
    }
    // 注冊死信交換器
    @Bean
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }
    // 死信隊(duì)列A綁定死信交換器
    @Bean
    public Binding deadLetterQueueABinding(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange deadLetterExchange){
        return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY);
    }
    // 死信隊(duì)列B綁定死信交換器
    @Bean
    public Binding deadLetterQueueBBinding(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange")DirectExchange deadLetterExchange){
        return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY);
    }

到此為止,RabbitMQ的組件配置完成。

3. 添加application.yml

server:
  port: 8081
spring:
  application:
    name: test-rabbitmq-producer
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

4. 添加RabbitMQListener (消費(fèi)者)

下方的代碼一共有兩個(gè)消費(fèi)者,一個(gè)消費(fèi)者獲取死信隊(duì)列A中的消息,另一個(gè)消費(fèi)者獲取死信隊(duì)列B中的消息。

@Component
public class DeadLetterQueueConsumer {
    public static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterQueueConsumer.class);
 
    @RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_A_NAME,ackMode = "MANUAL")
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        LOGGER.info("當(dāng)前時(shí)間:{},死信隊(duì)列A收到消息:{}", new Date().toString(), msg);
        System.out.println(message.getMessageProperties().getDeliveryTag());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
 
    @RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_B_NAME,ackMode = "MANUAL")
    public void receiveB(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        LOGGER.info("當(dāng)前時(shí)間:{},死信隊(duì)列B收到消息:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

5. 創(chuàng)建DelayMessageSender

這里采用的就是兩種不同的方式,一種方式是使用插件來延遲消息的發(fā)送,另一種是通過TTL參數(shù)。

@Component
public class DelayMessageSender {
    @Resource
    RabbitTemplate rabbitTemplate;
 
 
    public void sendMessage(String msg,Integer delayTimes){
        switch (delayTimes){
            case 6:
                rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_EXCHANGE_NAME, DeadRabbitConfig.DELAY_QUEUE_ROUTING_A_NAME,msg,new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setExpiration(String.valueOf(6000));
                        return message;
                    }
                });
                break;
            case 10:
                rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_QUEUE_B_NAME,msg);
                break;
        }
    }
}

6. 創(chuàng)建Controller 

@RestController
@RequestMapping("/student")
public class StudentController {
    @Autowired
    DelayMessageSender messageSender;
    @RequestMapping("/send-message")
    public String sendMessage(String msg,Integer delayTimes){
        System.out.println(new Date());
        messageSender.sendMessage(msg,delayTimes);
        return "發(fā)送成功";
    }
}

7.測試

在瀏覽器中輸入以下地址進(jìn)入RabbitMQ界面。賬號密碼都是guest。

http://localhost:15672/

先來看看我們的初始隊(duì)列。這里是什么都沒有的。

然后我們啟動(dòng)項(xiàng)目后在看。我們剛才創(chuàng)建出來的四個(gè)隊(duì)列全部都被加載了出來。

使用PostMan發(fā)送一次請求。

我們的請求在17s的時(shí)候發(fā)送到后端,消息打印在23s,說明我們的延遲隊(duì)列有效果。

接下來我們測試10s的延遲隊(duì)列。

10s后死信隊(duì)列B成功的接收到了消息。

四、死信隊(duì)列的應(yīng)用場景

延遲隊(duì)列通常用于需要延遲執(zhí)行某些任務(wù)或觸發(fā)某些事件的場景。例如,在電子商務(wù)中,可以使用延遲隊(duì)列實(shí)現(xiàn)訂單超時(shí)未支付自動(dòng)取消功能。

1.訂單創(chuàng)建

用戶下單后,系統(tǒng)生成訂單,并將訂單信息發(fā)送到一個(gè)普通隊(duì)列,同時(shí)設(shè)置一個(gè)TTL(Time-To-Live)為30分鐘。這個(gè)隊(duì)列配置了死信交換機(jī)(Dead Letter Exchange, DLX),當(dāng)消息過期后會被轉(zhuǎn)發(fā)到死信隊(duì)列。

2.等待支付

在30分鐘內(nèi),用戶可以完成支付。如果用戶在30分鐘內(nèi)支付完成,系統(tǒng)會從普通隊(duì)列中移除對應(yīng)的消息并正常處理訂單。

3.訂單超時(shí)處理

如果用戶未在30分鐘內(nèi)完成支付,消息會自動(dòng)過期并轉(zhuǎn)發(fā)到死信交換機(jī),進(jìn)而轉(zhuǎn)發(fā)到死信隊(duì)列。

4.取消訂單

系統(tǒng)有一個(gè)專門的消費(fèi)者監(jiān)聽死信隊(duì)列。當(dāng)有消息進(jìn)入死信隊(duì)列時(shí),消費(fèi)者會自動(dòng)處理這些消息,即取消訂單、釋放庫存,并通知用戶訂單已取消。

5.定時(shí)任務(wù)(可選):

雖然死信隊(duì)列已經(jīng)提供了超時(shí)訂單的處理,但為了防止消息丟失或處理延遲,可以設(shè)置一個(gè)定時(shí)任務(wù)定期檢查訂單狀態(tài),確保所有超時(shí)未支付的訂單都得到了處理。

以上就是SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列和死信隊(duì)列的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot RabbitMQ隊(duì)列的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 示例解析java面向?qū)ο缶幊谭庋b與訪問控制

    示例解析java面向?qū)ο缶幊谭庋b與訪問控制

    這篇文章主要為大家介紹了java封裝與訪問控制的示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-05-05
  • JUnit單元測試入門必看篇

    JUnit單元測試入門必看篇

    下面小編就為大家分享一篇JUnit單元測試入門必看篇,對新手而言有很好的參考價(jià)值,希望對大家有所幫助
    2017-11-11
  • 淺談Maven環(huán)境隔離應(yīng)用

    淺談Maven環(huán)境隔離應(yīng)用

    這篇文章主要介紹了淺談Maven環(huán)境隔離應(yīng)用,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2018-09-09
  • java基礎(chǔ)之泛型知識點(diǎn)總結(jié)

    java基礎(chǔ)之泛型知識點(diǎn)總結(jié)

    這篇文章主要介紹了java基礎(chǔ)之泛型知識點(diǎn)總結(jié),文中有非常詳細(xì)的代碼示例,對正在學(xué)習(xí)java基礎(chǔ)的小伙伴們有很好的幫助,需要的朋友可以參考下
    2021-04-04
  • Java實(shí)現(xiàn)手機(jī)號碼歸屬地查詢

    Java實(shí)現(xiàn)手機(jī)號碼歸屬地查詢

    這篇文章主要為大家詳細(xì)介紹了如何利用Java實(shí)現(xiàn)手機(jī)號碼歸屬地查詢功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2024-12-12
  • 如何把spring boot應(yīng)用發(fā)布到Harbor

    如何把spring boot應(yīng)用發(fā)布到Harbor

    這篇文章主要介紹了如何把spring boot應(yīng)用發(fā)布到Harbor,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-11-11
  • SpringBoot中的聲明式事務(wù)詳解

    SpringBoot中的聲明式事務(wù)詳解

    這篇文章主要介紹了SpringBoot中的聲明式事務(wù)詳解,Spring采用統(tǒng)一的機(jī)制來處理不同的數(shù)據(jù)訪問技術(shù)的事務(wù), Spring的事務(wù)提供一個(gè)PlatformTransactionManager的接口,不同的數(shù)據(jù)訪問技術(shù)使用不同的接口實(shí)現(xiàn),需要的朋友可以參考下
    2023-08-08
  • Java中byte[]、String、Hex字符串等轉(zhuǎn)換的方法

    Java中byte[]、String、Hex字符串等轉(zhuǎn)換的方法

    這篇文章主要介紹了Java中byte[]、String、Hex字符串等轉(zhuǎn)換的方法,代碼很簡單,需要的朋友可以參考下
    2018-05-05
  • 使用Java實(shí)現(xiàn)價(jià)格加密與優(yōu)化功能

    使用Java實(shí)現(xiàn)價(jià)格加密與優(yōu)化功能

    在現(xiàn)代軟件開發(fā)中,數(shù)據(jù)加密是一個(gè)非常重要的環(huán)節(jié),尤其是在處理敏感信息(如價(jià)格、用戶數(shù)據(jù)等)時(shí),本文將詳細(xì)介紹如何使用?Java?實(shí)現(xiàn)價(jià)格加密,并對代碼進(jìn)行優(yōu)化,需要的朋友可以參考下
    2025-01-01
  • Springboot整合分頁插件PageHelper步驟解析

    Springboot整合分頁插件PageHelper步驟解析

    這篇文章主要介紹了Springboot整合分頁插件PageHelper步驟解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-06-06

最新評論