RabbitMQ中的延遲隊(duì)列機(jī)制詳解
一、延遲隊(duì)列
延時(shí)隊(duì)列內(nèi)部是有序的,最重要的特性就體現(xiàn)在它的延時(shí)屬性上,延時(shí)隊(duì)列中的元素是希望 在指定時(shí)間到了以后或之前取出和處理,簡(jiǎn)單來(lái)說(shuō),延時(shí)隊(duì)列就是用來(lái)存放需要在指定時(shí)間被處理的元素的隊(duì)列。
二、隊(duì)列TTL
創(chuàng)建一個(gè)配置類(lèi),聲明并配置交換機(jī)和隊(duì)列
@Configuration public class TtlQueueConfig { //普通交換機(jī)名稱(chēng) public static final String NORMAL_EXCHANGE = "X"; //死信交換機(jī)名稱(chēng) public static final String DEAD_EXCHANGE = "Y"; //普通隊(duì)列名稱(chēng) public static final String NORMAL_QUEUE_A = "QA"; public static final String NORMAL_QUEUE_B = "QA"; //死信隊(duì)列名稱(chēng) public static final String DEAD_QUEUE = "QD"; //聲明普通交換機(jī) @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(NORMAL_EXCHANGE); } //聲明死信交換機(jī) @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(DEAD_EXCHANGE); } //聲明普通隊(duì)列,TTL為10s @Bean("QA") public Queue qA() { Map<String, Object> arguments = new HashMap<>(); //設(shè)置死信交換機(jī) arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //設(shè)置死信RoutingKey arguments.put("x-dead-letter-routing-key", "YD"); //設(shè)置TTL arguments.put("x-message-ttl", 10000); return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(arguments).build(); } //聲明普通隊(duì)列,TTL為10s @Bean("QB") public Queue qB() { Map<String, Object> arguments = new HashMap<>(); //設(shè)置死信交換機(jī) arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //設(shè)置死信RoutingKey arguments.put("x-dead-letter-routing-key", "YD"); //設(shè)置TTL arguments.put("x-message-ttl", 40000); return QueueBuilder.durable(NORMAL_QUEUE_B).withArguments(arguments).build(); } //聲明死信隊(duì)列 @Bean("QD") public Queue qD() { return QueueBuilder.durable(DEAD_QUEUE).build(); } //綁定對(duì)應(yīng)的交換機(jī)和隊(duì)列 @Bean public Binding queueABindingX(@Qualifier("QA") Queue QA, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(QA).to(xExchange).with("XA"); } @Bean public Binding queueBBindingX(@Qualifier("QB") Queue QB, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(QB).to(xExchange).with("XB"); } @Bean public Binding queueDBindingY(@Qualifier("QD") Queue QD, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(QD).to(yExchange).with("YD"); } }
創(chuàng)建一個(gè)生產(chǎn)者
@Slf4j @RestController public class SendMsgController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/ttl/sendMsg/{message}") public void sendMsg(@PathVariable String message) { log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條信息給兩個(gè)隊(duì)列:{}", new Date().toString(), message); rabbitTemplate.convertAndSend("X","XA","消息來(lái)自TTL為10s的隊(duì)列QA:" + message); rabbitTemplate.convertAndSend("X","XB","消息來(lái)自TTL為40s的隊(duì)列QB:" + message); } }
創(chuàng)建一個(gè)消費(fèi)者
@Slf4j @Component public class DeadLetterQueueConsumer { //接收消息 @RabbitListener(queues = "QD") public void receivedQD(Message message, Channel channel) { String msg = new String(message.getBody()); log.info("當(dāng)前時(shí)間:{}, 收到死信隊(duì)列的消息:{}", new Date().toString(), message); } }
瀏覽器發(fā)送消息
消費(fèi)者分別過(guò)了10s和40s接收到了消息
三、延遲隊(duì)列的優(yōu)化
不同的延遲時(shí)間需要設(shè)置不同的 TTL ,可以?xún)?yōu)化聲明一個(gè)通用的 QC 隊(duì)列,具體的延遲時(shí)間有生產(chǎn)者決定
在配置類(lèi) TtlQueueConfig 中配置通用隊(duì)列 QC
//通用隊(duì)列名稱(chēng) public static final String Generic_QUEUE_C = "QC"; //聲明通用隊(duì)列 @Bean("QC") public Queue qC() { Map<String, Object> arguments = new HashMap<>(); //設(shè)置死信交換機(jī) arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //設(shè)置死信RoutingKey arguments.put("x-dead-letter-routing-key", "YD"); //因?yàn)槭峭ㄓ藐?duì)列,所以不設(shè)置TTL,由生產(chǎn)者指定消息的TTL return QueueBuilder.durable(Generic_QUEUE_C).withArguments(arguments).build(); } //綁定通用隊(duì)列和普通交換機(jī) @Bean public Binding queueCBindingX(@Qualifier("QC") Queue QC, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(QC).to(xExchange).with("XC"); } //綁定通用隊(duì)列和死信交換機(jī) @Bean public Binding queueCBindingY(@Qualifier("QC") Queue QC, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(QC).to(yExchange).with("YD"); }
生產(chǎn)者發(fā)送消息,并指定 TTL 時(shí)長(zhǎng)
//發(fā)送消息,并指定消息的TTL @GetMapping("/ttl/sendExpirationMsg/{message}/{ttlTime}") public void sendExpirationMsg(@PathVariable("message") String message, @PathVariable("ttlTime") String ttlTime) { log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條TTL為{}ms的消息給隊(duì)列QC:{}", new Date().toString(), ttlTime, message); rabbitTemplate.convertAndSend("X", "XC", message, msg -> { //設(shè)置消息的TTL時(shí)長(zhǎng) msg.getMessageProperties().setExpiration(ttlTime); return msg; }); }
發(fā)送兩條消息
消費(fèi)者接收消息
但是,如果連續(xù)發(fā)送兩條消息,如果使用在消息屬性上設(shè)置 TTL 的方式,消息可能并不會(huì)按時(shí)“死亡“,因?yàn)?RabbitMQ 只會(huì)檢查第一個(gè)消息是否過(guò)期,如果過(guò)期則丟到死信隊(duì)列,如果第一個(gè)消息的延時(shí)時(shí)長(zhǎng)很長(zhǎng),而第二個(gè)消息的延時(shí)時(shí)長(zhǎng)很短,第二個(gè)消息并不會(huì)優(yōu)先得到執(zhí)行。結(jié)果會(huì)導(dǎo)致第二條消息消費(fèi)者收到時(shí)間有誤。
四、基于 RabbitMQ 插件實(shí)現(xiàn)延遲隊(duì)列
如果不能實(shí)現(xiàn)在消息粒度上的 TTL ,并使其在設(shè)置的 TTL 時(shí)間及時(shí)死亡,就無(wú)法設(shè)計(jì)成一個(gè)通用的延時(shí)隊(duì)列??梢允褂没?RabbitMQ 插件來(lái)實(shí)現(xiàn)延遲隊(duì)列,從而解決這個(gè)問(wèn)題。
基于 RabbitMQ 插件實(shí)現(xiàn)延遲,是交換機(jī)實(shí)現(xiàn)延遲,而不再是隊(duì)列實(shí)現(xiàn)延遲
創(chuàng)建一個(gè)基于插件的延遲隊(duì)列配置類(lèi) DelayedQueueConfig
@Configuration public class DelayedQueueConfig { //交換機(jī)名稱(chēng) public static final String DELAYED_EXCHANGE_NAME = "delayed_exchange"; //隊(duì)列名稱(chēng) public static final String DELAYED_QUEUE_NAME = "delayed_queue"; //routingKey public static final String DELAYED_ROUTING_KEY = "delayed_routingKey"; //聲明交換機(jī) @Bean public CustomExchange delayedExchange() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", "direct"); //設(shè)置延遲類(lèi)型 return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments); } //聲明隊(duì)列 @Bean public Queue delayedQueue() { return new Queue(DELAYED_QUEUE_NAME); } //綁定隊(duì)列和交換機(jī) @Bean public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) { return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
創(chuàng)建生產(chǎn)者發(fā)送延遲消息
//基于插件發(fā)送消息 @GetMapping("/ttl/sendDelayedMsg/{message}/{delayedTime}") public void sendDelayedMsg(@PathVariable("message") String message, @PathVariable("delayedTime") Integer delayedTime) { log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條時(shí)長(zhǎng)為{}ms的消息給延遲隊(duì)列delayed_queue:{}", new Date().toString(), delayedTime, message); rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> { //設(shè)置消息的延遲時(shí)長(zhǎng) msg.getMessageProperties().setDelay(delayedTime); return msg; }); }
創(chuàng)建消費(fèi)者
@Slf4j @Component public class DelayedQueueConsumer { //監(jiān)聽(tīng)消息 @RabbitListener(queues = {DelayedQueueConfig.DELAYED_QUEUE_NAME}) public void receiveDelayQueue(Message message){ String msg = new String(message.getBody()); log.info("當(dāng)前時(shí)間:{}, 收到延遲隊(duì)列的消息:{}", new Date().toString(), msg); } }
當(dāng)連續(xù)發(fā)送兩條不同延遲時(shí)長(zhǎng)的消息時(shí),消費(fèi)者會(huì)先接收到延遲時(shí)長(zhǎng)短的那條消息,再接收延遲時(shí)長(zhǎng)長(zhǎng)的那條消息。
實(shí)現(xiàn)延遲隊(duì)列,一種是基于死信隊(duì)列的方式,一種是基于RabbitMQ插件的方式。
延時(shí)隊(duì)列在需要延時(shí)處理的場(chǎng)景下非常有用,使用 RabbitMQ 來(lái)實(shí)現(xiàn)延時(shí)隊(duì)列可以很好的利用RabbitMQ 的特性,如消息可靠發(fā)送、消息可靠投遞、死信隊(duì)列來(lái)保障消息至少被消費(fèi)一次以及未被正確處理的消息不會(huì)被丟棄。另外,通過(guò) RabbitMQ 集群的特性,可以很好的解決單點(diǎn)故障問(wèn)題,不會(huì)因?yàn)閱蝹€(gè)節(jié)點(diǎn)掛掉導(dǎo)致延時(shí)隊(duì)列不可用或者消息丟失。
到此這篇關(guān)于RabbitMQ中的延遲隊(duì)列機(jī)制詳解的文章就介紹到這了,更多相關(guān)RabbitMQ延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java中RabbitMQ延遲隊(duì)列實(shí)現(xiàn)詳解
- 詳解RabbitMQ中延遲隊(duì)列結(jié)合業(yè)務(wù)場(chǎng)景的使用
- 詳解RabbitMQ延遲隊(duì)列的基本使用和優(yōu)化
- SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列的示例詳解
- GoLang RabbitMQ TTL與死信隊(duì)列以及延遲隊(duì)列詳細(xì)講解
- springcloud安裝rabbitmq并配置延遲隊(duì)列插件的過(guò)程詳解
- springcloud中RabbitMQ死信隊(duì)列與延遲交換機(jī)實(shí)現(xiàn)方法
相關(guān)文章
jpa多數(shù)據(jù)源時(shí)Hibernate配置自動(dòng)生成表不生效的解決
這篇文章主要介紹了jpa多數(shù)據(jù)源時(shí)Hibernate配置自動(dòng)生成表不生效的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02Java實(shí)現(xiàn)讀取Excel文件功能(EasyExcel初使用)
EasyExcel是一款基于Java語(yǔ)言的開(kāi)源Excel解析工具,可以幫助我們快速、高效地讀取和寫(xiě)入Excel文件,這篇文章主要給大家介紹了關(guān)于Java實(shí)現(xiàn)讀取Excel文件功能的相關(guān)資料,使用的是EasyExcel,需要的朋友可以參考下2024-07-07一文搞懂Java?ScheduledExecutorService的使用
JUC包(java.util.concurrent)中提供了對(duì)定時(shí)任務(wù)的支持,即ScheduledExecutorService接口。本文主要對(duì)ScheduledExecutorService的使用進(jìn)行簡(jiǎn)單的介紹,需要的可以參考一下2022-11-11SpringBoot實(shí)現(xiàn)Tomcat集群的會(huì)話(huà)管理功能
在使用 Tomcat 集群時(shí),由于每個(gè) Tomcat 實(shí)例的 Session 存儲(chǔ)是獨(dú)立的,導(dǎo)致無(wú)法實(shí)現(xiàn) Session 的共享,這可能影響到用戶(hù)跨節(jié)點(diǎn)的訪問(wèn),為了實(shí)現(xiàn)跨 Tomcat 實(shí)例共享 Session,可以使用 Spring Session 配合 Redis 進(jìn)行集中式會(huì)話(huà)管理,需要的朋友可以參考下2024-12-12Java數(shù)據(jù)結(jié)構(gòu)之簡(jiǎn)單的連接點(diǎn)(link)實(shí)現(xiàn)方法示例
這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)之簡(jiǎn)單的連接點(diǎn)(link)實(shí)現(xiàn)方法,涉及java指針指向節(jié)點(diǎn)的相關(guān)使用技巧,需要的朋友可以參考下2017-10-10