詳解RabbitMQ延遲隊(duì)列的基本使用和優(yōu)化
1.延遲隊(duì)列基本介紹
一般隊(duì)列中的元素總是希望能夠早點(diǎn)被取出來(lái)進(jìn)行處理,但是延遲隊(duì)列中的元素則是希望可以在指定時(shí)間內(nèi)被取出和處理,延遲隊(duì)列中的元素都是帶有時(shí)間屬性的。延遲隊(duì)列就是用來(lái)存放需要在指定時(shí)間被處理的元素的隊(duì)列
延遲隊(duì)列就是想要消息延遲一段時(shí)間后被處理,TTL可以讓消息在延遲一段時(shí)間后變成死信。變成死信的消息都會(huì)被投遞到死信隊(duì)列中,這樣的話(huà),只要消費(fèi)者一直消費(fèi)死信隊(duì)列里面的消息就可以了,因?yàn)槔锩娴南⒍际窍M获R上處理的消息 生產(chǎn)者生產(chǎn)一條延時(shí)消息,根據(jù)需要延時(shí)時(shí)間的不同,通過(guò)不同的routing key把消息路由到不同的延遲隊(duì)列,每一個(gè)隊(duì)列都設(shè)置了不同的TTL屬性,并且綁定在同一個(gè)死信交換機(jī)中,消息過(guò)期了以后,根據(jù)routing key的不同,又會(huì)被路由到不同的死信隊(duì)列中,消費(fèi)者只需要監(jiān)聽(tīng)對(duì)應(yīng)的死信隊(duì)列進(jìn)行處理就可以了。注意:不要造成重復(fù)消費(fèi)
2.延遲隊(duì)列使用場(chǎng)景
下面的場(chǎng)景需要使用延遲隊(duì)列
- 訂單在十分鐘內(nèi)沒(méi)有支付就自動(dòng)取消
- 新創(chuàng)建的店鋪,如果在十天內(nèi)都沒(méi)有上傳過(guò)商品,則自動(dòng)發(fā)送消息提醒
- 賬單在一周內(nèi)沒(méi)有支付,就會(huì)自動(dòng)結(jié)算
- 用戶(hù)注冊(cè)成功以后,如果三天內(nèi)沒(méi)有登錄就進(jìn)行短信題提醒
- 用戶(hù)發(fā)起退款,如果三天內(nèi)沒(méi)有得到處理則通知相關(guān)運(yùn)營(yíng)人員。
- 預(yù)定會(huì)議以后,需要提前十分鐘通知各個(gè)參會(huì)人員參加會(huì)議。
3.Spring Boot集成RabbitMQ
3.1創(chuàng)建項(xiàng)目,引入依賴(lài)
相關(guān)依賴(lài)
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
3.2application.properties配置文件
# RabbitMQ/配置 #服務(wù)器地址 spring.rabbitmq.host=服務(wù)器地址 #服務(wù)端口號(hào) spring.rabbitmq.port=5672 #虛擬主機(jī)名稱(chēng) spring.rabbitmq.virtual-host=/myhost #用戶(hù)名 spring.rabbitmq.username=admin #密碼 spring.rabbitmq.password=123456
3.3 隊(duì)列TTL-代碼結(jié)構(gòu)圖
3.4MQ配置類(lèi)
package com.zyh.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @author zengyihong * @create 2022--10--04 16:44 */ @Configuration public class TtlQueueConfiguration { //普通交換機(jī) public static final String X_EXCHANGE = "X"; //普通隊(duì)列 public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; //死信交換機(jī) public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //死信隊(duì)列QD public static final String QUEUE_D = "QD"; /** * 聲明普通交換機(jī)X * * @return */ @Bean public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } /** * 聲明隊(duì)列QA * * @return */ @Bean public Queue queueA() { //創(chuàng)建集合保存隊(duì)列屬性 Map<String, Object> map = new HashMap<>(); //設(shè)置該隊(duì)列綁定的死信交換機(jī)名稱(chēng) map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //設(shè)置routing key map.put("x-dead-letter-routing-key", "YD"); //設(shè)置隊(duì)列延遲時(shí)間 10秒 map.put("x-message-ttl", 10000); //創(chuàng)建隊(duì)列 return QueueBuilder.durable(QUEUE_A).withArguments(map).build(); } /** * 把QA隊(duì)列和交換機(jī)X進(jìn)行綁定 * * @return */ @Bean public Binding queueA_BindingX(@Qualifier("queueA") Queue queue, @Qualifier("xExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XA"); } /** * 聲明隊(duì)列QB * * @return */ @Bean public Queue queueB() { //創(chuàng)建集合保存隊(duì)列屬性 Map<String, Object> map = new HashMap<>(); //設(shè)置該隊(duì)列綁定的死信交換機(jī)名稱(chēng) map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //設(shè)置routing key map.put("x-dead-letter-routing-key", "YD"); //設(shè)置隊(duì)列延遲時(shí)間 10秒 map.put("x-message-ttl", 40000); //創(chuàng)建隊(duì)列 return QueueBuilder.durable(QUEUE_A).withArguments(map).build(); } /** * 把QB隊(duì)列和交換機(jī)X進(jìn)行綁定 * * @return */ @Bean public Binding queueB_BindingX(@Qualifier("queueB") Queue queue, @Qualifier("xExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XB"); } /** * 聲明死信交換機(jī)Y * * @return */ @Bean public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } /** * 聲明死信隊(duì)列QD * * @return */ @Bean public Queue queueD() { return new Queue(QUEUE_D); } /** * 把死信交換機(jī)和死信隊(duì)列進(jìn)行綁定 * @param queue * @param exchange * @return */ @Bean public Binding deadLetterBindingQD(@Qualifier("queueD") Queue queue, @Qualifier("yExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("YD"); } }
3.5生產(chǎn)者代碼
@Slf4j @RestController @RequestMapping("/ttl") public class SendMessageController { @Resource private RabbitTemplate rabbitTemplate; /** * 生產(chǎn)者發(fā)送消息 * @param message */ @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message){ //記錄日志 log.info("當(dāng)前時(shí)間:{},發(fā)送一條信息給兩個(gè)TTL隊(duì)列:{}",new Date(),message); //給QA隊(duì)列發(fā)送消息 rabbitTemplate.convertSendAndReceive("X","XA", "消息來(lái)自TTL為10秒的隊(duì)列:"+message); rabbitTemplate.convertSendAndReceive("X","XB", "消息來(lái)自TTL為40秒的隊(duì)列:"+message); } }
3.6消費(fèi)者代碼
@Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = TtlQueueConfiguration.QUEUE_D) public void receiveQD(Message message, Channel channel){ //獲取消息 String msg=new String(message.getBody()); log.info("當(dāng)前時(shí)間:{},收到死信隊(duì)列消息:{}",new Date(),msg); } }
3.7測(cè)試
啟動(dòng)boot項(xiàng)目,在瀏覽器輸入localhost:8080/ttl/sendMessage/Hello
但是這種方式有一種缺點(diǎn),現(xiàn)在我們只有TTL為10s和40s的延遲隊(duì)列,如果我們需要其他延時(shí)時(shí)間的隊(duì)列的話(huà),那么我們又得新增其他隊(duì)列,這樣其實(shí)并不方便,我們想要的是能夠動(dòng)態(tài)設(shè)置TTL,這樣就不需要為每個(gè)TTL設(shè)置新的延遲隊(duì)列了。
4.延遲隊(duì)列優(yōu)化
4.1代碼結(jié)構(gòu)圖
4.2配置類(lèi)
在之前寫(xiě)的代碼基礎(chǔ)上新增一個(gè)配置類(lèi)
package com.zyh.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @author zengyihong * @create 2022--10--05 10:44 */ @Configuration public class MessageTtlQueueConfiguration { //死信交換機(jī) public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //普通隊(duì)列 public static final String QUEUE_C = "QC"; /** * 聲明QC隊(duì)列 * @return */ @Bean public Queue queueC(){ //創(chuàng)建集合保存隊(duì)列屬性 Map<String, Object> map = new HashMap<>(); //設(shè)置該隊(duì)列綁定的死信交換機(jī)名稱(chēng) map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //設(shè)置routing key map.put("x-dead-letter-routing-key", "YD"); //設(shè)置隊(duì)列延遲時(shí)間 10秒 map.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_C).withArguments(map).build(); } /** * 把QC隊(duì)列和正常交換機(jī)X進(jìn)行綁定 * * @return */ @Bean public Binding queueC_BindingX(@Qualifier("queueC") Queue queue, @Qualifier("xExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XC"); } }
4.3生產(chǎn)者
package com.zyh.controller; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import javax.annotation.Resources; import java.util.Date; /** * @author zengyihong * @create 2022--10--04 19:36 */ @Slf4j @RestController @RequestMapping("/ttl") public class SendMessageController { @Resource private RabbitTemplate rabbitTemplate; /** * 生產(chǎn)者發(fā)送消息 * * @param message */ @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message) { //記錄日志 log.info("當(dāng)前時(shí)間:{},發(fā)送一條信息給兩個(gè)TTL隊(duì)列:{}", new Date(), message); //給QA隊(duì)列發(fā)送消息 rabbitTemplate.convertSendAndReceive("X", "XA", "消息來(lái)自TTL為10秒的隊(duì)列:" + message); rabbitTemplate.convertSendAndReceive("X", "XB", "消息來(lái)自TTL為40秒的隊(duì)列:" + message); } /** * 生產(chǎn)者發(fā)送消息(動(dòng)態(tài)設(shè)置有效期) * * @param message */ @GetMapping("/sendMessage/{message}/{ttlTime}") public void sendMessage(@PathVariable String message, @PathVariable String ttlTime) { MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //設(shè)置消息有效期 message.getMessageProperties().setExpiration(ttlTime); return message; } }; //記錄日志 log.info("當(dāng)前時(shí)間:{},發(fā)送一條時(shí)長(zhǎng){}毫秒信息給隊(duì)列QC:{}", new Date(),ttlTime, message); //給QC隊(duì)列發(fā)送消息 rabbitTemplate.convertAndSend("X", "XC", message, messagePostProcessor); } }
4.4消費(fèi)者
@Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = TtlQueueConfiguration.QUEUE_D) public void receiveQD(Message message, Channel channel){ //獲取消息 String msg=new String(message.getBody()); log.info("當(dāng)前時(shí)間:{},收到死信隊(duì)列消息:{}",new Date(),msg); } }
4.5測(cè)試
啟動(dòng)boot項(xiàng)目
在瀏覽器輸入
http://localhost:8080/ttl/sendMessage/Hello/20000
http://localhost:8080/ttl/sendMessage/你好/2000
如果在消息屬性上設(shè)置TTL的方式,那么消息可能不會(huì)按時(shí)死亡,因?yàn)镽abbitMQ只會(huì)檢查第一個(gè)消息是否過(guò)期,如果過(guò)期則丟到死信隊(duì)列,如果第一個(gè)消息的延時(shí)時(shí)長(zhǎng)很長(zhǎng),而第二個(gè)消息的延時(shí)時(shí)長(zhǎng)很短,第二個(gè)消息并不會(huì)優(yōu)先得到執(zhí)行
到此這篇關(guān)于詳解RabbitMQ延遲隊(duì)列的基本使用和優(yōu)化的文章就介紹到這了,更多相關(guān)RabbitMQ延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列的示例詳解
- SpringBoot整合RabbitMQ處理死信隊(duì)列和延遲隊(duì)列
- 詳解RabbitMQ中死信隊(duì)列和延遲隊(duì)列的使用詳解
- RabbitMQ?延遲隊(duì)列實(shí)現(xiàn)訂單支付結(jié)果異步階梯性通知(實(shí)例代碼)
- RabbitMQ 實(shí)現(xiàn)延遲隊(duì)列的兩種方式詳解
- RabbitMQ死信機(jī)制實(shí)現(xiàn)延遲隊(duì)列的實(shí)戰(zhàn)
- 手把手帶你掌握SpringBoot RabbitMQ延遲隊(duì)列
- 詳解RabbitMQ中延遲隊(duì)列結(jié)合業(yè)務(wù)場(chǎng)景的使用
相關(guān)文章
SpringBoot整合EasyExcel進(jìn)行大數(shù)據(jù)處理的方法詳解
EasyExcel是一個(gè)基于Java的簡(jiǎn)單、省內(nèi)存的讀寫(xiě)Excel的開(kāi)源項(xiàng)目。在盡可能節(jié)約內(nèi)存的情況下支持讀寫(xiě)百M(fèi)的Excel。本文將在SpringBoot中整合EasyExcel進(jìn)行大數(shù)據(jù)處理,感興趣的可以了解一下2022-05-05劍指Offer之Java算法習(xí)題精講數(shù)組與列表的查找及字符串轉(zhuǎn)換
跟著思路走,之后從簡(jiǎn)單題入手,反復(fù)去看,做過(guò)之后可能會(huì)忘記,之后再做一次,記不住就反復(fù)做,反復(fù)尋求思路和規(guī)律,慢慢積累就會(huì)發(fā)現(xiàn)質(zhì)的變化2022-03-03詳解mybatis 批量更新數(shù)據(jù)兩種方法效率對(duì)比
這篇文章主要介紹了詳解mybatis 批量更新數(shù)據(jù)兩種方法效率對(duì)比,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-02-02Java后臺(tái)基于POST獲取JSON格式數(shù)據(jù)
這篇文章主要介紹了Java后臺(tái)基于POST獲取JSON格式數(shù)據(jù),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03了解java架構(gòu)之微服務(wù)架構(gòu)—雪崩效應(yīng)
這篇文章主要介紹了了解java架構(gòu)之微服務(wù)架構(gòu)—雪崩效應(yīng),微服務(wù)化產(chǎn)品線(xiàn),每一個(gè)服務(wù)專(zhuān)心于自己的業(yè)務(wù)邏輯,并對(duì)外提供相應(yīng)的接口,看上去似乎很明了,其實(shí)還有很多的東西需要考慮,,需要的朋友可以參考下2019-06-06