詳解RabbitMQ延遲隊列的基本使用和優(yōu)化
1.延遲隊列基本介紹
一般隊列中的元素總是希望能夠早點被取出來進行處理,但是延遲隊列中的元素則是希望可以在指定時間內(nèi)被取出和處理,延遲隊列中的元素都是帶有時間屬性的。延遲隊列就是用來存放需要在指定時間被處理的元素的隊列

延遲隊列就是想要消息延遲一段時間后被處理,TTL可以讓消息在延遲一段時間后變成死信。變成死信的消息都會被投遞到死信隊列中,這樣的話,只要消費者一直消費死信隊列里面的消息就可以了,因為里面的消息都是希望被馬上處理的消息 生產(chǎn)者生產(chǎn)一條延時消息,根據(jù)需要延時時間的不同,通過不同的routing key把消息路由到不同的延遲隊列,每一個隊列都設(shè)置了不同的TTL屬性,并且綁定在同一個死信交換機中,消息過期了以后,根據(jù)routing key的不同,又會被路由到不同的死信隊列中,消費者只需要監(jiān)聽對應(yīng)的死信隊列進行處理就可以了。注意:不要造成重復消費
2.延遲隊列使用場景
下面的場景需要使用延遲隊列
- 訂單在十分鐘內(nèi)沒有支付就自動取消
- 新創(chuàng)建的店鋪,如果在十天內(nèi)都沒有上傳過商品,則自動發(fā)送消息提醒
- 賬單在一周內(nèi)沒有支付,就會自動結(jié)算
- 用戶注冊成功以后,如果三天內(nèi)沒有登錄就進行短信題提醒
- 用戶發(fā)起退款,如果三天內(nèi)沒有得到處理則通知相關(guān)運營人員。
- 預(yù)定會議以后,需要提前十分鐘通知各個參會人員參加會議。
3.Spring Boot集成RabbitMQ
3.1創(chuàng)建項目,引入依賴

相關(guān)依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>3.2application.properties配置文件
# RabbitMQ/配置 #服務(wù)器地址 spring.rabbitmq.host=服務(wù)器地址 #服務(wù)端口號 spring.rabbitmq.port=5672 #虛擬主機名稱 spring.rabbitmq.virtual-host=/myhost #用戶名 spring.rabbitmq.username=admin #密碼 spring.rabbitmq.password=123456
3.3 隊列TTL-代碼結(jié)構(gòu)圖

3.4MQ配置類
package com.zyh.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author zengyihong
* @create 2022--10--04 16:44
*/
@Configuration
public class TtlQueueConfiguration {
//普通交換機
public static final String X_EXCHANGE = "X";
//普通隊列
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
//死信交換機
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
//死信隊列QD
public static final String QUEUE_D = "QD";
/**
* 聲明普通交換機X
*
* @return
*/
@Bean
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
/**
* 聲明隊列QA
*
* @return
*/
@Bean
public Queue queueA() {
//創(chuàng)建集合保存隊列屬性
Map<String, Object> map = new HashMap<>();
//設(shè)置該隊列綁定的死信交換機名稱
map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//設(shè)置routing key
map.put("x-dead-letter-routing-key", "YD");
//設(shè)置隊列延遲時間 10秒
map.put("x-message-ttl", 10000);
//創(chuàng)建隊列
return QueueBuilder.durable(QUEUE_A).withArguments(map).build();
}
/**
* 把QA隊列和交換機X進行綁定
*
* @return
*/
@Bean
public Binding queueA_BindingX(@Qualifier("queueA") Queue queue, @Qualifier("xExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("XA");
}
/**
* 聲明隊列QB
*
* @return
*/
@Bean
public Queue queueB() {
//創(chuàng)建集合保存隊列屬性
Map<String, Object> map = new HashMap<>();
//設(shè)置該隊列綁定的死信交換機名稱
map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//設(shè)置routing key
map.put("x-dead-letter-routing-key", "YD");
//設(shè)置隊列延遲時間 10秒
map.put("x-message-ttl", 40000);
//創(chuàng)建隊列
return QueueBuilder.durable(QUEUE_A).withArguments(map).build();
}
/**
* 把QB隊列和交換機X進行綁定
*
* @return
*/
@Bean
public Binding queueB_BindingX(@Qualifier("queueB") Queue queue, @Qualifier("xExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("XB");
}
/**
* 聲明死信交換機Y
*
* @return
*/
@Bean
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
/**
* 聲明死信隊列QD
*
* @return
*/
@Bean
public Queue queueD() {
return new Queue(QUEUE_D);
}
/**
* 把死信交換機和死信隊列進行綁定
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding deadLetterBindingQD(@Qualifier("queueD") Queue queue, @Qualifier("yExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("YD");
}
}3.5生產(chǎn)者代碼
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 生產(chǎn)者發(fā)送消息
* @param message
*/
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message){
//記錄日志
log.info("當前時間:{},發(fā)送一條信息給兩個TTL隊列:{}",new Date(),message);
//給QA隊列發(fā)送消息
rabbitTemplate.convertSendAndReceive("X","XA", "消息來自TTL為10秒的隊列:"+message);
rabbitTemplate.convertSendAndReceive("X","XB", "消息來自TTL為40秒的隊列:"+message);
}
}3.6消費者代碼
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = TtlQueueConfiguration.QUEUE_D)
public void receiveQD(Message message, Channel channel){
//獲取消息
String msg=new String(message.getBody());
log.info("當前時間:{},收到死信隊列消息:{}",new Date(),msg);
}
}3.7測試


啟動boot項目,在瀏覽器輸入localhost:8080/ttl/sendMessage/Hello

但是這種方式有一種缺點,現(xiàn)在我們只有TTL為10s和40s的延遲隊列,如果我們需要其他延時時間的隊列的話,那么我們又得新增其他隊列,這樣其實并不方便,我們想要的是能夠動態(tài)設(shè)置TTL,這樣就不需要為每個TTL設(shè)置新的延遲隊列了。
4.延遲隊列優(yōu)化
4.1代碼結(jié)構(gòu)圖

4.2配置類
在之前寫的代碼基礎(chǔ)上新增一個配置類
package com.zyh.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author zengyihong
* @create 2022--10--05 10:44
*/
@Configuration
public class MessageTtlQueueConfiguration {
//死信交換機
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
//普通隊列
public static final String QUEUE_C = "QC";
/**
* 聲明QC隊列
* @return
*/
@Bean
public Queue queueC(){
//創(chuàng)建集合保存隊列屬性
Map<String, Object> map = new HashMap<>();
//設(shè)置該隊列綁定的死信交換機名稱
map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//設(shè)置routing key
map.put("x-dead-letter-routing-key", "YD");
//設(shè)置隊列延遲時間 10秒
map.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_C).withArguments(map).build();
}
/**
* 把QC隊列和正常交換機X進行綁定
*
* @return
*/
@Bean
public Binding queueC_BindingX(@Qualifier("queueC") Queue queue, @Qualifier("xExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("XC");
}
}4.3生產(chǎn)者
package com.zyh.controller;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import javax.annotation.Resources;
import java.util.Date;
/**
* @author zengyihong
* @create 2022--10--04 19:36
*/
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 生產(chǎn)者發(fā)送消息
*
* @param message
*/
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message) {
//記錄日志
log.info("當前時間:{},發(fā)送一條信息給兩個TTL隊列:{}", new Date(), message);
//給QA隊列發(fā)送消息
rabbitTemplate.convertSendAndReceive("X", "XA", "消息來自TTL為10秒的隊列:" + message);
rabbitTemplate.convertSendAndReceive("X", "XB", "消息來自TTL為40秒的隊列:" + message);
}
/**
* 生產(chǎn)者發(fā)送消息(動態(tài)設(shè)置有效期)
*
* @param message
*/
@GetMapping("/sendMessage/{message}/{ttlTime}")
public void sendMessage(@PathVariable String message, @PathVariable String ttlTime) {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//設(shè)置消息有效期
message.getMessageProperties().setExpiration(ttlTime);
return message;
}
};
//記錄日志
log.info("當前時間:{},發(fā)送一條時長{}毫秒信息給隊列QC:{}", new Date(),ttlTime, message);
//給QC隊列發(fā)送消息
rabbitTemplate.convertAndSend("X", "XC", message, messagePostProcessor);
}
}4.4消費者
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = TtlQueueConfiguration.QUEUE_D)
public void receiveQD(Message message, Channel channel){
//獲取消息
String msg=new String(message.getBody());
log.info("當前時間:{},收到死信隊列消息:{}",new Date(),msg);
}
}4.5測試
啟動boot項目

在瀏覽器輸入
http://localhost:8080/ttl/sendMessage/Hello/20000
http://localhost:8080/ttl/sendMessage/你好/2000

如果在消息屬性上設(shè)置TTL的方式,那么消息可能不會按時死亡,因為RabbitMQ只會檢查第一個消息是否過期,如果過期則丟到死信隊列,如果第一個消息的延時時長很長,而第二個消息的延時時長很短,第二個消息并不會優(yōu)先得到執(zhí)行
到此這篇關(guān)于詳解RabbitMQ延遲隊列的基本使用和優(yōu)化的文章就介紹到這了,更多相關(guān)RabbitMQ延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot整合EasyExcel進行大數(shù)據(jù)處理的方法詳解
EasyExcel是一個基于Java的簡單、省內(nèi)存的讀寫Excel的開源項目。在盡可能節(jié)約內(nèi)存的情況下支持讀寫百M的Excel。本文將在SpringBoot中整合EasyExcel進行大數(shù)據(jù)處理,感興趣的可以了解一下2022-05-05
劍指Offer之Java算法習題精講數(shù)組與列表的查找及字符串轉(zhuǎn)換
跟著思路走,之后從簡單題入手,反復去看,做過之后可能會忘記,之后再做一次,記不住就反復做,反復尋求思路和規(guī)律,慢慢積累就會發(fā)現(xiàn)質(zhì)的變化2022-03-03
詳解mybatis 批量更新數(shù)據(jù)兩種方法效率對比
這篇文章主要介紹了詳解mybatis 批量更新數(shù)據(jù)兩種方法效率對比,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-02-02
Java后臺基于POST獲取JSON格式數(shù)據(jù)
這篇文章主要介紹了Java后臺基于POST獲取JSON格式數(shù)據(jù),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-03-03
了解java架構(gòu)之微服務(wù)架構(gòu)—雪崩效應(yīng)
這篇文章主要介紹了了解java架構(gòu)之微服務(wù)架構(gòu)—雪崩效應(yīng),微服務(wù)化產(chǎn)品線,每一個服務(wù)專心于自己的業(yè)務(wù)邏輯,并對外提供相應(yīng)的接口,看上去似乎很明了,其實還有很多的東西需要考慮,,需要的朋友可以參考下2019-06-06

