關于SpringBoot整合RabbitMQ實現(xiàn)死信隊列
概念介紹
什么是死信
死信可以理解成沒有被正常消費的消息,在RabbitMQ中以下幾種情況會被認定為死信:
- 消費者使用basic.reject或basic.nack(重新排隊參數(shù)設置為false)對消息進行否定確認。
- 消息到達生存時間還未被消費。
- 隊列超過長度限制,消息被丟棄。
這些消息會被發(fā)送到死信交換機并路由到死信隊列中(在RabbitMQ中死信交換機和死信隊列就是普通的交換機和隊列)。其流轉(zhuǎn)過程如下圖
死信隊列應用
- 作為消息可靠性的一個擴展。比如,在隊列已滿的情況下也不會丟失消息。
- 可以實現(xiàn)延遲消費功能。比如,訂單15分鐘內(nèi)未支付。
注意事項:基于死信隊列實現(xiàn)的延遲消費不適合時間過于復雜的場景。比如,一個隊列中第一條消息TTL為10s,第二條消息TTL為5s,由于RabbitMQ只會監(jiān)聽第一條消息,所以本應第二條消息先達到TTL會在第一條消息的TTL之后。對于該現(xiàn)象有兩種解決方案:
- 維護多個隊列,每個隊列維護一個TTL時間。
- 使用延遲交換機。這種方式需要下載插件支持
工程搭建
環(huán)境說明
- RabbitMQ環(huán)境
- Java版本:JDK1.8
- Maven版本:apache-maven-3.6.3
- 開發(fā)工具:IntelliJ IDEA
搭建步驟
1.創(chuàng)建SpringBoot項目。
2.pom.xml文件導入RabbitMQ依賴。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
3.application.yml文件添加RabbitMQ配置。
spring: # rabbitmq配置信息 RabbitProperties類 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / # 開啟confirm機制 publisher-confirm-type: correlated # 開啟return機制 publisher-returns: true #全局配置,局部配置存在就以局部為準 listener: simple: acknowledge-mode: manual # 手動ACK
實現(xiàn)死信
準備Exchange&Queue
@Configuration public class RabbitMQConfig { /** * 正常隊列 */ public static final String EXCHANGE = "boot-exchange"; public static final String QUEUE = "boot-queue"; public static final String ROUTING_KEY = "boot-rout"; /** * 死信隊列 */ public static final String DEAD_EXCHANGE = "dead-exchange"; public static final String DEAD_QUEUE = "dead-queue"; public static final String DEAD_ROUTING_KEY = "dead-rout"; /** * 聲明死信交換機 * * @return */ @Bean public Exchange deadExchange() { return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build(); } /** * 聲明死信隊列 * * @return */ @Bean public Queue deadQueue() { return QueueBuilder.durable(DEAD_QUEUE).build(); } /** * 綁定死信的隊列和交換機 * * @param deadExchange * @param deadQueue * @return */ @Bean public Binding deadBind(Exchange deadExchange, Queue deadQueue) { return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs(); } /** * 聲明交換機,同channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT); * * @return */ @Bean public Exchange bootExchange() { return ExchangeBuilder.directExchange(EXCHANGE).build(); } /** * 聲明隊列,同channel.queueDeclare(QUEUE, true, false, false, null); * 綁定死信交換機及路由key * * @return */ @Bean public Queue bootQueue() { return QueueBuilder.durable(QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_ROUTING_KEY) //聲明隊列屬性有更改時需要刪除隊列 //給隊列設置消息時長 //.ttl(10000) //隊列最大長度 .maxLength(1) .build(); } /** * 綁定隊列和交換機,同 channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY); * * @param bootExchange * @param bootQueue * @return */ @Bean public Binding bootBind(Exchange bootExchange, Queue bootQueue) { return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs(); } }
監(jiān)聽死信隊列
@RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE) public void listener_dead(String msg, Channel channel, Message message) throws IOException { System.out.println("死信接收到消息" + msg); System.out.println("唯一標識:" + message.getMessageProperties().getCorrelationId()); System.out.println("messageID:" + message.getMessageProperties().getMessageId()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
方式一、消費者拒絕&否認
- 拒絕消息
@RabbitListener(queues = RabbitMQConfig.QUEUE) public void listener(String msg, Channel channel, Message message) throws IOException { System.out.println("接收到消息" + msg); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false) }
- 否認消息
@RabbitListener(queues = RabbitMQConfig.QUEUE) public void listener(String msg, Channel channel, Message message) throws IOException { System.out.println("接收到消息" + msg); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); }
方式二、超過消息TTL 發(fā)送消息時設置TTL
@SpringBootTest public class Publisher { @Autowired private RabbitTemplate template; /** * 5秒未被消費會路由到死信隊列 */ @Test public void publish_expir() { template.convertAndSend(RabbitMQConfig.EXCHANGE, RabbitMQConfig.ROUTING_KEY, "hello expir dead", message -> { message.getMessageProperties().setExpiration("5000"); return message; }); } }
- 設置隊列所有消息的TTL
更新RabbitMQConfig類中bootQueue() ,更新后需要刪除隊列,因為隊列屬性有更改。
@Bean public Queue bootQueue() { return QueueBuilder.durable(QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_ROUTING_KEY) //聲明隊列屬性有更改時需要刪除隊列 //給隊列設置消息時長 .ttl(10000) .build(); }
方式三、超過隊列長度限制
設置隊列長度限制,當隊列長度超過設置的閾值,消息便會路由到死信隊列。
@Bean public Queue bootQueue() { return QueueBuilder.durable(QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_ROUTING_KEY) //聲明隊列屬性有更改時需要刪除隊列 .maxLength(1) .build(); }
代碼倉庫 點我
到此這篇關于關于SpringBoot整合RabbitMQ實現(xiàn)死信隊列的文章就介紹到這了,更多相關RabbitMQ實現(xiàn)死信隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
- SpringBoot整合RabbitMQ處理死信隊列和延遲隊列
- SpringBoot+RabbitMQ?實現(xiàn)死信隊列的示例
- 如何利用rabbitMq的死信隊列實現(xiàn)延時消息
- 深入分析RabbitMQ中死信隊列與死信交換機
- 關于Rabbitmq死信隊列及延時隊列的實現(xiàn)
- Springboot結(jié)合rabbitmq實現(xiàn)的死信隊列
- RabbitMQ之死信隊列深入解析
- springboot中RabbitMQ死信隊列的實現(xiàn)示例
- SpringBoot整合RabbitMQ實現(xiàn)延遲隊列和死信隊列
- springboot整合RabbitMQ中死信隊列的實現(xiàn)
相關文章
Mybatis的mapper標簽 namespace屬性用法說明
這篇文章主要介紹了Mybatis的mapper標簽 namespace屬性用法說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09如何解決redis的NOAUTH Authentication required異常
這篇文章主要介紹了Jedis異常解決:NOAUTH Authentication required,,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值2019-07-07詳解springboot項目帶Tomcat和不帶Tomcat的兩種打包方式
這篇文章主要介紹了詳解springboot項目帶Tomcat和不帶Tomcat的兩種打包方式,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-09-09Springboot MultipartFile文件上傳與下載的實現(xiàn)示例
在Spring Boot項目中,可以使用MultipartFile類來處理文件上傳和下載操作,本文就詳細介紹了如何使用,具有一定的參考價值,感興趣的可以了解一下2023-08-08SpringCloud配置服務端的ConfigServer設置安全認證
這篇文章主要為大家介紹了SpringCloud配置服務端的ConfigServer設置安全認證,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-08-08