SpringBoot整合RabbitMQ處理死信隊列和延遲隊列
簡介
說明
本文用示例介紹SpringBoot整合RabbitMQ時如何處理死信隊列/延遲隊列。
RabbitMQ消息簡介
RabbitMQ的消息默認不會超時。
什么是死信隊列?什么是延遲隊列?
死信隊列:
DLX,全稱為Dead-Letter-Exchange,可以稱之為死信交換器,也有人稱之為死信郵箱。當消息在一個隊列中變成死信(dead message)之后,它能被重新被發(fā)送到另一個交換器中,這個交換器就是DLX,綁定DLX的隊列就稱之為死信隊列。
以下幾種情況會導致消息變成死信:
- 消息被拒絕(Basic.Reject/Basic.Nack),并且設置requeue參數(shù)為false;
- 消息過期;
- 隊列達到最大長度。
延遲隊列:
延遲隊列用來存放延遲消息。延遲消息:指當消息被發(fā)送以后,不想讓消費者立刻拿到消息,而是等待特定時間后,消費者才能拿到這個消息進行消費。
相關(guān)網(wǎng)址
實例代碼
路由配置
package com.example.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitRouterConfig { public static final String EXCHANGE_TOPIC_WELCOME = "Exchange@topic.welcome"; public static final String EXCHANGE_FANOUT_UNROUTE = "Exchange@fanout.unroute"; public static final String EXCHANGE_TOPIC_DELAY = "Exchange@topic.delay"; public static final String ROUTINGKEY_HELLOS = "hello.#"; public static final String ROUTINGKEY_DELAY = "delay.#"; public static final String QUEUE_HELLO = "Queue@hello"; public static final String QUEUE_HI = "Queue@hi"; public static final String QUEUE_UNROUTE = "Queue@unroute"; public static final String QUEUE_DELAY = "Queue@delay"; public static final Integer TTL_QUEUE_MESSAGE = 5000; @Autowired AmqpAdmin amqpAdmin; @Bean Object initBindingTest() { amqpAdmin.declareExchange(ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT_UNROUTE).durable(true).autoDelete().build()); amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_DELAY).durable(true).autoDelete().build()); amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME) .durable(true) .autoDelete() .withArgument("alternate-exchange", EXCHANGE_FANOUT_UNROUTE) .build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HI).build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HELLO) .withArgument("x-dead-letter-exchange", EXCHANGE_TOPIC_DELAY) .withArgument("x-dead-letter-routing-key", ROUTINGKEY_DELAY) .withArgument("x-message-ttl", TTL_QUEUE_MESSAGE) .build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_UNROUTE).build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_DELAY).build()); amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE, EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null)); amqpAdmin.declareBinding(new Binding(QUEUE_UNROUTE, Binding.DestinationType.QUEUE, EXCHANGE_FANOUT_UNROUTE, "", null)); amqpAdmin.declareBinding(new Binding(QUEUE_DELAY, Binding.DestinationType.QUEUE, EXCHANGE_TOPIC_DELAY, ROUTINGKEY_DELAY, null)); return new Object(); } }
控制器
package com.example.controller; import com.example.config.RabbitRouterConfig; import com.example.mq.Sender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; @RestController public class HelloController { @Autowired private Sender sender; @PostMapping("/hi") public void hi() { sender.send(RabbitRouterConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now()); } @PostMapping("/hello1") public void hello1() { sender.send("hello.a", "hello1 message:" + LocalDateTime.now()); } @PostMapping("/hello2") public void hello2() { sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "hello.b", "hello2 message:" + LocalDateTime.now()); } @PostMapping("/ae") public void aeTest() { sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "nonono", "ae message:" + LocalDateTime.now()); } }
發(fā)送器
package com.example.mq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; @Component public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void send(String routingKey, String message) { this.rabbitTemplate.convertAndSend(routingKey, message); } public void send(String exchange, String routingKey, String message) { this.rabbitTemplate.convertAndSend(exchange, routingKey, message); } }
接收器
package com.example.mq; import com.example.config.RabbitRouterConfig; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Receiver { @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI) public void hi(String payload) { System.out.println ("Receiver(hi) : " + payload); } // @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO) // public void hello(String hello) throws InterruptedException { // System.out.println ("Receiver(hello) : " + hello); // Thread.sleep(5 * 1000); // System.out.println("(hello):sleep over"); // } // // @RabbitListener(queues = RabbitRouterConfig.QUEUE_UNROUTE) // public void unroute(String hello) throws InterruptedException { // System.out.println ("Receiver(unroute) : " + hello); // Thread.sleep(5 * 1000); // System.out.println("(unroute):sleep over"); // } @RabbitListener(queues = RabbitRouterConfig.QUEUE_DELAY) public void delay(String hello) throws InterruptedException { System.out.println ("Receiver(delay) : " + hello); Thread.sleep(5 * 1000); System.out.println("(delay):sleep over"); } }
application.yml
server: # port: 9100 port: 9101 spring: application: # name: demo-rabbitmq-sender name: demo-rabbitmq-receiver rabbitmq: host: localhost port: 5672 username: admin password: 123456 # virtualHost: / publisher-confirms: true publisher-returns: true # listener: # simple: # acknowledge-mode: manual # direct: # acknowledge-mode: manual
實例測試
分別啟動發(fā)送者和接收者。
訪問:http://localhost:9100/hello2
五秒鐘后輸出:
Receiver(delay) : hello2 message:2020-11-27T09:30:51.548
(delay):sleep over
以上就是SpringBoot整合RabbitMQ處理死信隊列和延遲隊列的詳細內(nèi)容,更多關(guān)于SpringBoot RabbitMQ死信隊列 延遲隊列的資料請關(guān)注腳本之家其它相關(guān)文章!
- SpringBoot+RabbitMQ?實現(xiàn)死信隊列的示例
- 如何利用rabbitMq的死信隊列實現(xiàn)延時消息
- 深入分析RabbitMQ中死信隊列與死信交換機
- 關(guān)于SpringBoot整合RabbitMQ實現(xiàn)死信隊列
- 關(guān)于Rabbitmq死信隊列及延時隊列的實現(xiàn)
- Springboot結(jié)合rabbitmq實現(xiàn)的死信隊列
- RabbitMQ之死信隊列深入解析
- springboot中RabbitMQ死信隊列的實現(xiàn)示例
- SpringBoot整合RabbitMQ實現(xiàn)延遲隊列和死信隊列
- springboot整合RabbitMQ中死信隊列的實現(xiàn)
相關(guān)文章
springboot yml配置文件定義list集合、數(shù)組和map以及使用中的錯誤
這篇文章主要介紹了springboot yml配置文件定義list集合、數(shù)組和map以及使用中遇到的錯誤問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07mybatis 根據(jù)id批量刪除的實現(xiàn)操作
這篇文章主要介紹了mybatis 根據(jù)id批量刪除的實現(xiàn)操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-08-08繼承WebMvcConfigurationSupport后自動配置不生效及如何配置攔截器
這篇文章主要介紹了繼承WebMvcConfigurationSupport后自動配置不生效及如何配置攔截器,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-11-11零基礎學Java:Java開發(fā)工具 Eclipse 安裝過程創(chuàng)建第一個Java項目及Eclipse的一些基礎使用技巧
這篇文章主要介紹了零基礎學Java:Java開發(fā)工具 Eclipse 安裝過程創(chuàng)建第一個Java項目及Eclipse的一些基礎使用技巧,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-09-09spring boot security自定義認證的代碼示例
這篇文章主要介紹了spring boot security自定義認證,本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-07-07