RabbitMQ中的延遲隊(duì)列機(jī)制詳解
一、延遲隊(duì)列
延時(shí)隊(duì)列內(nèi)部是有序的,最重要的特性就體現(xiàn)在它的延時(shí)屬性上,延時(shí)隊(duì)列中的元素是希望 在指定時(shí)間到了以后或之前取出和處理,簡單來說,延時(shí)隊(duì)列就是用來存放需要在指定時(shí)間被處理的元素的隊(duì)列。
二、隊(duì)列TTL

創(chuàng)建一個(gè)配置類,聲明并配置交換機(jī)和隊(duì)列
@Configuration
public class TtlQueueConfig {
//普通交換機(jī)名稱
public static final String NORMAL_EXCHANGE = "X";
//死信交換機(jī)名稱
public static final String DEAD_EXCHANGE = "Y";
//普通隊(duì)列名稱
public static final String NORMAL_QUEUE_A = "QA";
public static final String NORMAL_QUEUE_B = "QA";
//死信隊(duì)列名稱
public static final String DEAD_QUEUE = "QD";
//聲明普通交換機(jī)
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}
//聲明死信交換機(jī)
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(DEAD_EXCHANGE);
}
//聲明普通隊(duì)列,TTL為10s
@Bean("QA")
public Queue qA() {
Map<String, Object> arguments = new HashMap<>();
//設(shè)置死信交換機(jī)
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//設(shè)置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "YD");
//設(shè)置TTL
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(arguments).build();
}
//聲明普通隊(duì)列,TTL為10s
@Bean("QB")
public Queue qB() {
Map<String, Object> arguments = new HashMap<>();
//設(shè)置死信交換機(jī)
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//設(shè)置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "YD");
//設(shè)置TTL
arguments.put("x-message-ttl", 40000);
return QueueBuilder.durable(NORMAL_QUEUE_B).withArguments(arguments).build();
}
//聲明死信隊(duì)列
@Bean("QD")
public Queue qD() {
return QueueBuilder.durable(DEAD_QUEUE).build();
}
//綁定對(duì)應(yīng)的交換機(jī)和隊(duì)列
@Bean
public Binding queueABindingX(@Qualifier("QA") Queue QA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(QA).to(xExchange).with("XA");
}
@Bean
public Binding queueBBindingX(@Qualifier("QB") Queue QB,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(QB).to(xExchange).with("XB");
}
@Bean
public Binding queueDBindingY(@Qualifier("QD") Queue QD,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(QD).to(yExchange).with("YD");
}
}創(chuàng)建一個(gè)生產(chǎn)者
@Slf4j
@RestController
public class SendMsgController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/ttl/sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條信息給兩個(gè)隊(duì)列:{}", new Date().toString(), message);
rabbitTemplate.convertAndSend("X","XA","消息來自TTL為10s的隊(duì)列QA:" + message);
rabbitTemplate.convertAndSend("X","XB","消息來自TTL為40s的隊(duì)列QB:" + message);
}
}創(chuàng)建一個(gè)消費(fèi)者
@Slf4j
@Component
public class DeadLetterQueueConsumer {
//接收消息
@RabbitListener(queues = "QD")
public void receivedQD(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("當(dāng)前時(shí)間:{}, 收到死信隊(duì)列的消息:{}", new Date().toString(), message);
}
}瀏覽器發(fā)送消息

消費(fèi)者分別過了10s和40s接收到了消息

三、延遲隊(duì)列的優(yōu)化
不同的延遲時(shí)間需要設(shè)置不同的 TTL ,可以優(yōu)化聲明一個(gè)通用的 QC 隊(duì)列,具體的延遲時(shí)間有生產(chǎn)者決定

在配置類 TtlQueueConfig 中配置通用隊(duì)列 QC
//通用隊(duì)列名稱
public static final String Generic_QUEUE_C = "QC";
//聲明通用隊(duì)列
@Bean("QC")
public Queue qC() {
Map<String, Object> arguments = new HashMap<>();
//設(shè)置死信交換機(jī)
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//設(shè)置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "YD");
//因?yàn)槭峭ㄓ藐?duì)列,所以不設(shè)置TTL,由生產(chǎn)者指定消息的TTL
return QueueBuilder.durable(Generic_QUEUE_C).withArguments(arguments).build();
}
//綁定通用隊(duì)列和普通交換機(jī)
@Bean
public Binding queueCBindingX(@Qualifier("QC") Queue QC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(QC).to(xExchange).with("XC");
}
//綁定通用隊(duì)列和死信交換機(jī)
@Bean
public Binding queueCBindingY(@Qualifier("QC") Queue QC,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(QC).to(yExchange).with("YD");
}生產(chǎn)者發(fā)送消息,并指定 TTL 時(shí)長
//發(fā)送消息,并指定消息的TTL
@GetMapping("/ttl/sendExpirationMsg/{message}/{ttlTime}")
public void sendExpirationMsg(@PathVariable("message") String message, @PathVariable("ttlTime") String ttlTime) {
log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條TTL為{}ms的消息給隊(duì)列QC:{}", new Date().toString(), ttlTime, message);
rabbitTemplate.convertAndSend("X", "XC", message, msg -> {
//設(shè)置消息的TTL時(shí)長
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
}發(fā)送兩條消息


消費(fèi)者接收消息

但是,如果連續(xù)發(fā)送兩條消息,如果使用在消息屬性上設(shè)置 TTL 的方式,消息可能并不會(huì)按時(shí)“死亡“,因?yàn)?RabbitMQ 只會(huì)檢查第一個(gè)消息是否過期,如果過期則丟到死信隊(duì)列,如果第一個(gè)消息的延時(shí)時(shí)長很長,而第二個(gè)消息的延時(shí)時(shí)長很短,第二個(gè)消息并不會(huì)優(yōu)先得到執(zhí)行。結(jié)果會(huì)導(dǎo)致第二條消息消費(fèi)者收到時(shí)間有誤。

四、基于 RabbitMQ 插件實(shí)現(xiàn)延遲隊(duì)列
如果不能實(shí)現(xiàn)在消息粒度上的 TTL ,并使其在設(shè)置的 TTL 時(shí)間及時(shí)死亡,就無法設(shè)計(jì)成一個(gè)通用的延時(shí)隊(duì)列??梢允褂没?RabbitMQ 插件來實(shí)現(xiàn)延遲隊(duì)列,從而解決這個(gè)問題。
基于 RabbitMQ 插件實(shí)現(xiàn)延遲,是交換機(jī)實(shí)現(xiàn)延遲,而不再是隊(duì)列實(shí)現(xiàn)延遲


創(chuàng)建一個(gè)基于插件的延遲隊(duì)列配置類 DelayedQueueConfig
@Configuration
public class DelayedQueueConfig {
//交換機(jī)名稱
public static final String DELAYED_EXCHANGE_NAME = "delayed_exchange";
//隊(duì)列名稱
public static final String DELAYED_QUEUE_NAME = "delayed_queue";
//routingKey
public static final String DELAYED_ROUTING_KEY = "delayed_routingKey";
//聲明交換機(jī)
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct"); //設(shè)置延遲類型
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
}
//聲明隊(duì)列
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
//綁定隊(duì)列和交換機(jī)
@Bean
public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}創(chuàng)建生產(chǎn)者發(fā)送延遲消息
//基于插件發(fā)送消息
@GetMapping("/ttl/sendDelayedMsg/{message}/{delayedTime}")
public void sendDelayedMsg(@PathVariable("message") String message, @PathVariable("delayedTime") Integer delayedTime) {
log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條時(shí)長為{}ms的消息給延遲隊(duì)列delayed_queue:{}", new Date().toString(), delayedTime, message);
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {
//設(shè)置消息的延遲時(shí)長
msg.getMessageProperties().setDelay(delayedTime);
return msg;
});
}創(chuàng)建消費(fèi)者
@Slf4j
@Component
public class DelayedQueueConsumer {
//監(jiān)聽消息
@RabbitListener(queues = {DelayedQueueConfig.DELAYED_QUEUE_NAME})
public void receiveDelayQueue(Message message){
String msg = new String(message.getBody());
log.info("當(dāng)前時(shí)間:{}, 收到延遲隊(duì)列的消息:{}", new Date().toString(), msg);
}
}當(dāng)連續(xù)發(fā)送兩條不同延遲時(shí)長的消息時(shí),消費(fèi)者會(huì)先接收到延遲時(shí)長短的那條消息,再接收延遲時(shí)長長的那條消息。

實(shí)現(xiàn)延遲隊(duì)列,一種是基于死信隊(duì)列的方式,一種是基于RabbitMQ插件的方式。
延時(shí)隊(duì)列在需要延時(shí)處理的場景下非常有用,使用 RabbitMQ 來實(shí)現(xiàn)延時(shí)隊(duì)列可以很好的利用RabbitMQ 的特性,如消息可靠發(fā)送、消息可靠投遞、死信隊(duì)列來保障消息至少被消費(fèi)一次以及未被正確處理的消息不會(huì)被丟棄。另外,通過 RabbitMQ 集群的特性,可以很好的解決單點(diǎn)故障問題,不會(huì)因?yàn)閱蝹€(gè)節(jié)點(diǎn)掛掉導(dǎo)致延時(shí)隊(duì)列不可用或者消息丟失。
到此這篇關(guān)于RabbitMQ中的延遲隊(duì)列機(jī)制詳解的文章就介紹到這了,更多相關(guān)RabbitMQ延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java中RabbitMQ延遲隊(duì)列實(shí)現(xiàn)詳解
- 詳解RabbitMQ中延遲隊(duì)列結(jié)合業(yè)務(wù)場景的使用
- 詳解RabbitMQ延遲隊(duì)列的基本使用和優(yōu)化
- SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列的示例詳解
- GoLang RabbitMQ TTL與死信隊(duì)列以及延遲隊(duì)列詳細(xì)講解
- springcloud安裝rabbitmq并配置延遲隊(duì)列插件的過程詳解
- springcloud中RabbitMQ死信隊(duì)列與延遲交換機(jī)實(shí)現(xiàn)方法
相關(guān)文章
jpa多數(shù)據(jù)源時(shí)Hibernate配置自動(dòng)生成表不生效的解決
這篇文章主要介紹了jpa多數(shù)據(jù)源時(shí)Hibernate配置自動(dòng)生成表不生效的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02
Java實(shí)現(xiàn)讀取Excel文件功能(EasyExcel初使用)
EasyExcel是一款基于Java語言的開源Excel解析工具,可以幫助我們快速、高效地讀取和寫入Excel文件,這篇文章主要給大家介紹了關(guān)于Java實(shí)現(xiàn)讀取Excel文件功能的相關(guān)資料,使用的是EasyExcel,需要的朋友可以參考下2024-07-07
一文搞懂Java?ScheduledExecutorService的使用
JUC包(java.util.concurrent)中提供了對(duì)定時(shí)任務(wù)的支持,即ScheduledExecutorService接口。本文主要對(duì)ScheduledExecutorService的使用進(jìn)行簡單的介紹,需要的可以參考一下2022-11-11
SpringBoot實(shí)現(xiàn)Tomcat集群的會(huì)話管理功能
在使用 Tomcat 集群時(shí),由于每個(gè) Tomcat 實(shí)例的 Session 存儲(chǔ)是獨(dú)立的,導(dǎo)致無法實(shí)現(xiàn) Session 的共享,這可能影響到用戶跨節(jié)點(diǎn)的訪問,為了實(shí)現(xiàn)跨 Tomcat 實(shí)例共享 Session,可以使用 Spring Session 配合 Redis 進(jìn)行集中式會(huì)話管理,需要的朋友可以參考下2024-12-12
Java數(shù)據(jù)結(jié)構(gòu)之簡單的連接點(diǎn)(link)實(shí)現(xiàn)方法示例
這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)之簡單的連接點(diǎn)(link)實(shí)現(xiàn)方法,涉及java指針指向節(jié)點(diǎn)的相關(guān)使用技巧,需要的朋友可以參考下2017-10-10

