Java中RabbitMQ延遲隊列實現(xiàn)詳解
一、RabbitMQ延遲隊列實現(xiàn)
1.1、RabbitMQ延遲隊列實現(xiàn)流程

- 生產(chǎn)者生產(chǎn)一條延遲消息,根據(jù)延遲時間的不同,利用不同的routing-key將消息路由到不同的延遲隊列,每個隊列都設置了不同的 TTL 屬性 ( TTL ( Time To Live ) 生存時間 ),并綁定到同一個死信交換機中。
- 消息過期后,根據(jù)routing-key的不同,又會被死信交換機路由到不同的死信隊列中,消費者只需要監(jiān)聽對應的死信隊列進行消費即可。
1.2、配置RabbitMQ連接
#[ RabbitMQ相關配置 ] #rabbitmq服務器IP spring.rabbitmq.host=安裝RabbitMQ的服務器IP #rabbitmq服務器端口(默認為5672) spring.rabbitmq.port=5672 #用戶名 spring.rabbitmq.username=guest #用戶密碼 spring.rabbitmq.password=guest #虛擬主機(一個RabbitMQ服務可以配置多個虛擬主機,每一個虛擬機主機之間是相互隔離,相互獨立的,授權(quán)用戶到指定的virtual-host就可以發(fā)送消息到指定隊列) #vhost虛擬主機地址( 默認為/ ) spring.rabbitmq.virtual-host=/
1.3、創(chuàng)建配置類
配置兩個交換機、四個隊列、以及根據(jù)路由鍵配置交換機和隊列的綁定關系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfiguration {
//延遲交換機
public static final String DELAY_EXCHANGE = "delay_exchange";
//延遲隊列A
public static final String DELAY_QUEUE_A = "delay_queue_a";
//延遲隊列B
public static final String DELAY_QUEUE_B = "delay_queue_b";
//延遲路由鍵10S
public static final String DELAY_QUEUE_10S_ROUTING_KEY = "delay_queue_10s_routing_key";
//延遲路由鍵60S
public static final String DELAY_QUEUE_60S_ROUTING_KEY = "delay_queue_60s_routing_key";
//死信交換機
public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
//死信隊列A
public static final String DEAD_LETTER_QUEUE_A = "dead_letter_queue_a";
//死信隊列B
public static final String DEAD_LETTER_QUEUE_B = "dead_letter_queue_b";
//死信路由鍵10S
public static final String DEAD_LETTER_QUEUE_10S_ROUTING_KEY = "dead_letter_queue_10s_routing_key";
//死信路由鍵60S
public static final String DEAD_LETTER_QUEUE_60S_ROUTING_KEY = "dead_letter_queue_60s_routing_key";
//延遲交換機
@Bean("delayExchange")
public DirectExchange delayExchange(){
return new DirectExchange(DELAY_EXCHANGE, true, false);
}
//延遲隊列A
@Bean("delayQueueA")
public Queue delayQueueA(){
Map<String, Object> args = new HashMap<>();
//設置延遲隊列綁定的死信交換機
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//設置延遲隊列綁定的死信路由鍵
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_10S_ROUTING_KEY);
//設置延遲隊列的 TTL 消息存活時間
args.put("x-message-ttl", 10*1000);
return new Queue(DELAY_QUEUE_A, true, false, false, args);
}
//延遲隊列B
@Bean("delayQueueB")
public Queue delayQueueB(){
Map<String, Object> args = new HashMap<>();
//設置延遲隊列綁定的死信交換機
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//設置延遲隊列綁定的死信路由鍵
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_60S_ROUTING_KEY);
//設置延遲隊列的 TTL 消息存活時間
args.put("x-message-ttl", 60*1000);
return new Queue(DELAY_QUEUE_B, true, false, false, args);
}
//延遲隊列A的綁定關系
@Bean("delayBindingA")
public Binding delayBindingA(@Qualifier("delayQueueA")Queue queue,
@Qualifier("delayExchange")DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_10S_ROUTING_KEY);
}
//延遲隊列B的綁定關系
@Bean("delayBindingB")
public Binding delayBindingB(@Qualifier("delayQueueB")Queue queue,
@Qualifier("delayExchange")DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_60S_ROUTING_KEY);
}
//死信交換機
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false);
}
//死信隊列A
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUE_A, true);
}
//死信隊列B
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB(){
return new Queue(DEAD_LETTER_QUEUE_B, true);
}
//死信隊列A的綁定關系
@Bean("deadLetterBindingA")
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA")Queue queue,
@Qualifier("deadLetterExchange")DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_10S_ROUTING_KEY);
}
//死信隊列B的綁定關系
@Bean("deadLetterBindingB")
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB")Queue queue,
@Qualifier("deadLetterExchange")DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_60S_ROUTING_KEY);
}
}1.4、創(chuàng)建一個枚舉類來配置延遲類型
@Getter
@AllArgsConstructor
public enum DelayTypeEnum {
//10s
DELAY_10s(1),
//60s
DELAY_60s(2);
private Integer type;
/**
* 延遲類型
* @param type
* @return 延遲類型
*/
public static DelayTypeEnum getDelayTypeEnum(Integer type){
if(Objects.equals(type, DELAY_10s.type)){
return DELAY_10s;
}
if(Objects.equals(type, DELAY_60s.type)){
return DELAY_60s;
}
return null;
}
}1.5、創(chuàng)建生產(chǎn)者類發(fā)送消息
import com.cd.springbootrabbitmq.enums.DelayTypeEnum;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_EXCHANGE;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE_10S_ROUTING_KEY;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE_60S_ROUTING_KEY;
/**
* 延遲消息生產(chǎn)者
*/
@Component
public class DelayMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 發(fā)送延遲消息
* @param message 要發(fā)送的消息
* @param type 延遲類型(延時10s的延遲隊列 或 延時60s的延遲隊列)
*/
public void sendDelayMessage(String message, DelayTypeEnum type){
switch (type){
case DELAY_10s:
rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_10S_ROUTING_KEY, message);
break;
case DELAY_60s:
rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_60S_ROUTING_KEY, message);
break;
default:
break;
}
}
}1.6、創(chuàng)建消費者類消費消息
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_A;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_B;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 監(jiān)聽死信隊列A
* @param message 接收的信息
*/
//@RabbitListener(queues = "dead_letter_queue_a")
@RabbitListener(queues = DEAD_LETTER_QUEUE_A)
public void receiveA(Message message) {
String msg = new String(message.getBody());
// 記錄日志
log.info("當前時間:{},死信隊列A收到的消息:{}", LocalDateTime.now(), msg);
}
/**
* 監(jiān)聽死信隊列B
* @param message 接收的信息
*/
//@RabbitListener(queues = "dead_letter_queue_b")
@RabbitListener(queues = DEAD_LETTER_QUEUE_B)
public void receiveB(Message message){
String msg = new String(message.getBody());
// 記錄日志
log.info("當前時間:{},死信隊列B收到的消息:{}", LocalDateTime.now(), msg);
}
}1.7、創(chuàng)建控制類
import com.cd.springbootrabbitmq.enums.DelayTypeEnum;
import com.cd.springbootrabbitmq.producer.DelayMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.util.Objects;
@Slf4j
@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQController {
@Autowired
private DelayMessageProducer producer;
@RequestMapping("/send")
public void send(String message, Integer delayType){
// 記錄日志
log.info("當前時間:{},消息:{},延遲類型:{}", LocalDateTime.now(), message, delayType);
// 發(fā)送延遲消息
producer.sendDelayMessage(message, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnum(delayType)));
}
}1.8、測試
在瀏覽器中先后提交下面兩個請求:
1)localhost:8080/rabbitmq/send?message=測試自定義延遲處理60s&delayType=2
2)localhost:8080/rabbitmq/send?message=測試自定義延遲處理10s&delayType=1
查看idea控制臺:

到此這篇關于Java中RabbitMQ延遲隊列實現(xiàn)詳解的文章就介紹到這了,更多相關RabbitMQ延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java數(shù)據(jù)結(jié)構(gòu) 遞歸之迷宮回溯案例講解
這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)遞歸之迷宮回溯案例講解,本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細內(nèi)容,需要的朋友可以參考下2021-08-08
使用Java獲取系統(tǒng)信息的常用代碼整理總結(jié)
這篇文章主要介紹了使用Java獲取系統(tǒng)信息的常用代碼整理總結(jié),在服務器端一般經(jīng)常能夠用到,歡迎收藏,需要的朋友可以參考下2015-11-11
spring?boot?3使用?elasticsearch?提供搜索建議的實例詳解
這篇文章主要介紹了spring?boot3使用elasticsearch提供搜索建議,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-08-08
springboot+vue+elementsUI實現(xiàn)分角色注冊登錄界面功能
這篇文章主要給大家介紹了關于springboot+vue+elementsUI實現(xiàn)分角色注冊登錄界面功能的相關資料,Spring?Boot和Vue.js是兩個非常流行的開源框架,可以用來構(gòu)建Web應用程序,需要的朋友可以參考下2023-07-07
解決Lombok使用@Builder無法build父類屬性的問題
這篇文章主要介紹了解決Lombok使用@Builder無法build父類屬性的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-09-09

