如何利用rabbitMq的死信隊列實現(xiàn)延時消息
前言
使用mq自帶的死信去實現(xiàn)延時消息要注意一個坑點,就是mq只會檢測隊首的消息的過期時間,假設先放入隊列10s過期消息,再放入2s過期。
mq會檢測頭部10s是否過期,10s不過期的情況下,2s就算過去也不會跑到死信.
mq基本的消息模型

mq死信隊列的消息模型
簡單的說就是先弄一個正常隊列,然后不要設置消費者,接著給這個正常隊列綁定一個死信隊列,這個死信隊列設置方式和正常隊列沒啥區(qū)別。
然后監(jiān)聽這個死信隊列的消費.

一般死信隊列由三大核心組件組成:死信交換機+死信路由+TTL(消息過期時間)
通常死信隊列由“面向生產(chǎn)者的基本交換機+基本路由”綁定而成,所以生產(chǎn)者首先是將消息發(fā)送至“基本交換機+基本路由”所綁定而成的消息模型中,即間接性地進入到死信隊列中,當過了TTL,消息將“掛掉”,從而進入下一個中轉站,即“面下那個消費者的死信交換機+死信路由”所綁定而成的消息模型中.
maven依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
配置普通隊列和死信隊列
yml文件
spring:
application:
name: ttl-queue
rabbitmq:
host: 110.40.181.73
port: 35672
username: root
password: 10086
virtual-host: /fchan
connection-timeout: 15000
# 發(fā)送確認
#publisher-confirms: true
# 路由失敗回調
publisher-returns: true
template:
# 必須設置成true 消息路由失敗通知監(jiān)聽者,而不是將消息丟棄
mandatory: true
listener:
simple:
# 每次從RabbitMQ獲取的消息數(shù)量
prefetch: 1
default-requeue-rejected: false
# 每個隊列啟動的消費者數(shù)量
concurrency: 1
# 每個隊列最大的消費者數(shù)量
max-concurrency: 1
# 簽收模式為手動簽收-那么需要在代碼中手動ACK
acknowledge-mode: manual
package com.fchan.mq.mqDelay.dlx;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyRabConfig {
//定義普通任務隊列(其實就是一個普通隊列,只是沒有消費者)
@Bean("delayQue")
public Queue delayQue(){
//普通隊列綁定死信交換機及路由key
//delayQueue延時隊列
return QueueBuilder.durable("delayQueue")
//指定這個延時隊列下的死信交換機
//如果消息過時則會被投遞到當前對應的my-dlx-exchange
.withArgument("x-dead-letter-exchange", "my-dlx-exchange")
//死信交換機綁定的路由key
//如果消息過時,my-dlx-exchange會根據(jù)routing-key-delay投遞消息到對應的隊列
//mq發(fā)送消息的時候一般指定的是exchange交換機+這個routingKey來找到隊列,這個應該是出于有時候需要讓mq的交換機將一個消息發(fā)送到多個隊列所采用的方式
.withArgument("x-dead-letter-routing-key", "routing-key-delay")
.build();
}
//定義普通隊列的交換機
@Bean("delayExchange")
public Exchange delayExchange(){
return ExchangeBuilder.directExchange("delayExchange").build();
}
//綁定普通隊列的交換機
@Bean("delayBinding")
public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("delayBindingRoutingKey").noargs();
}
//定義死信隊列
@Bean("dlxQueue")
public Queue dlxQueue(){
return QueueBuilder.durable("my-dlx-queue").build();
}
//定義死信交換機,這里的死信交換機要和上面普通隊列所綁定的交換機名稱一致
@Bean("dlxExchange")
public Exchange dlxExchange(){
return ExchangeBuilder.directExchange("my-dlx-exchange").build();
}
//綁定交換機與死信隊列
@Bean("dlxBinding")
public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange, @Qualifier("dlxQueue") Queue queue){
//這兒這個routingKey要和上面正常隊列中所綁定的死信routingKey一致
return BindingBuilder.bind(queue).to(exchange).with("routing-key-delay").noargs();
}
}
死信隊列消費者
package com.fchan.mq.mqDelay;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class MyRabbitConsume {
Logger log = LoggerFactory.getLogger(MyRabbitConsume.class);
@RabbitListener(queues = {"my-dlx-queue"})
public void receiveDeadMessage(Map<String,Object> map, Message message, Channel channel) throws Exception {
log.info("收到死信信息:{}",map);
log.info("然后進行一系列邏輯處理 Thanks?(?ω?)?");
//手動確認消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
發(fā)送消息測試
/**
* 發(fā)送消息到延時隊列 delayQueue
* 消息5秒后會被投遞到死信隊列
* @return
*/
@GetMapping("sendMsgToDelay")
public String sendMsgToDelay(String msg){
Map<String,Object> map = new HashMap<>();
map.put("msg", msg);
rabbitTemplate.convertAndSend("delayExchange","delayBindingRoutingKey", map, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//設置消息過期時間,5秒過期
message.getMessageProperties().setExpiration("5000");
return message;
}
});
return msg;
}
測試成功

總結
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
- SpringBoot整合RabbitMQ處理死信隊列和延遲隊列
- SpringBoot+RabbitMQ?實現(xiàn)死信隊列的示例
- 深入分析RabbitMQ中死信隊列與死信交換機
- 關于SpringBoot整合RabbitMQ實現(xiàn)死信隊列
- 關于Rabbitmq死信隊列及延時隊列的實現(xiàn)
- Springboot結合rabbitmq實現(xiàn)的死信隊列
- RabbitMQ之死信隊列深入解析
- springboot中RabbitMQ死信隊列的實現(xiàn)示例
- SpringBoot整合RabbitMQ實現(xiàn)延遲隊列和死信隊列
- springboot整合RabbitMQ中死信隊列的實現(xiàn)
相關文章
SpringBoot+微信小程序實現(xiàn)文件上傳與下載功能詳解
這篇文章主要為大家介紹了SpringBoot整合微信小程序實現(xiàn)文件上傳與下載功能,文中的實現(xiàn)步驟講解詳細,快跟隨小編一起學習一下吧2022-03-03
解決javaBean規(guī)范導致json傳參首字母大寫將永遠獲取不到問題
這篇文章主要介紹了解決javaBean規(guī)范導致json傳參首字母大寫將永遠獲取不到問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07
Spring實戰(zhàn)之FileSystemResource加載資源文件示例
這篇文章主要介紹了Spring實戰(zhàn)之FileSystemResource加載資源文件,結合實例形式分析了spring FileSystemResource加載xml資源文件的具體實現(xiàn)步驟與相關操作技巧,需要的朋友可以參考下2019-12-12
IDEA Ultimate2020.2版本配置Tomcat詳細教程
這篇文章主要介紹了IDEA Ultimate2020.2版本配置Tomcat教程,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-09-09
StackTraceElement獲取方法調用棧信息實例詳解
這篇文章主要介紹了StackTraceElement獲取方法調用棧信息實例詳解,分享了相關代碼示例,小編覺得還是挺不錯的,具有一定借鑒價值,需要的朋友可以參考下2018-02-02
Springboot如何使用mybatis實現(xiàn)攔截SQL分頁
這篇文章主要介紹了Springboot使用mybatis實現(xiàn)攔截SQL分頁,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-06-06

