redis和rabbitmq實(shí)現(xiàn)延時(shí)隊(duì)列的示例代碼
延遲隊(duì)列使用場(chǎng)景
1. 訂單超時(shí)處理:延遲隊(duì)列可以用于處理訂單超時(shí)問題。當(dāng)用戶下單后,將訂單信息放入延遲隊(duì)列,并設(shè)置一定的超時(shí)時(shí)間。如果在超時(shí)時(shí)間內(nèi)用戶未支付訂單,消費(fèi)者會(huì)從延遲隊(duì)列中獲取到該訂單,并執(zhí)行相應(yīng)的處理操作,如取消訂單、釋放庫存等。
2. 優(yōu)惠券過期提醒:延遲隊(duì)列可以用于優(yōu)惠券的過期提醒功能。將即將過期的優(yōu)惠券信息放入延遲隊(duì)列,并設(shè)置合適的延遲時(shí)間。當(dāng)延遲時(shí)間到達(dá)時(shí),消費(fèi)者將提醒用戶優(yōu)惠券即將過期,引導(dǎo)用戶盡快使用。
3. 異步通知與提醒:延遲隊(duì)列可以用于異步通知和提醒功能。例如,當(dāng)用戶完成某個(gè)操作后,系統(tǒng)可以將相關(guān)通知消息放入延遲隊(duì)列,并設(shè)置一定的延遲時(shí)間,以便在合適的時(shí)機(jī)發(fā)送通知給用戶。
Redis中zset實(shí)現(xiàn)延時(shí)隊(duì)列
1. 創(chuàng)建延遲隊(duì)列服務(wù)類
- 創(chuàng)建一個(gè)延遲隊(duì)列的服務(wù)類,例如DelayQueueService,用于操作Redis中的ZSet。這個(gè)服務(wù)類需要完成以下功能:
- 將消息放入延遲隊(duì)列:將消息作為元素添加到ZSet中,設(shè)置對(duì)應(yīng)的延遲時(shí)間作為分?jǐn)?shù)。輪詢并處理已到期的消息:定時(shí)任務(wù)或者消息消費(fèi)者輪詢檢查ZSet中的元素,獲取到達(dá)指定時(shí)間的消息進(jìn)行處理。刪除已處理的消息:處理完消息后,從ZSet中將其刪除。
@Service
public class DelayQueueService {
private static final String DELAY_QUEUE_KEY = "delay_queue";
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void addToDelayQueue(String message,long delayTime){
redisTemplate.opsForZSet().add(DELAY_QUEUE_KEY,message,System.currentTimeMillis()+delayTime);
}
public void processDelayedMessage(){
//reverseRangeByScore 從高到低
//rangeByScore 從低到高
Set<String> messages = redisTemplate.opsForZSet().rangeByScore(DELAY_QUEUE_KEY, 0, System.currentTimeMillis());
for(String message:messages){
//處理消息
System.out.println(message);
redisTemplate.opsForZSet().remove(DELAY_QUEUE_KEY,message);
}
}
}
2. 配置定時(shí)任務(wù)或消息消費(fèi)者
使用Spring Boot的定時(shí)任務(wù)或消息隊(duì)列框架,定時(shí)調(diào)用延遲隊(duì)列服務(wù)類的輪詢方法或監(jiān)聽指定的消息隊(duì)列,可以將輪訓(xùn)粒度放到1s一次。
@Component
public class DelayQueueSchedule {
@Autowired
private DelayQueueService delayQueueService;
// 每隔一段時(shí)間進(jìn)行輪詢并處理延遲消息
@Scheduled(fixedDelay = 1000)
public void pollAndProcessDelayedMessages() {
delayQueueService.pollAndProcessDelayedMessages();
}
}
然后在啟動(dòng)類上通過@EnableScheduling注解開啟任務(wù)調(diào)度能力。
缺點(diǎn):使用ZSET(有序集合,Sorted Set)來實(shí)現(xiàn)延遲任務(wù)調(diào)度(如訂單超時(shí)取消)是一種有效的方法,但它也有一些缺點(diǎn)和限制:
- 內(nèi)存消耗:ZSET 在Redis中是一個(gè)有序集合,它需要占用一定的內(nèi)存來存儲(chǔ)成員和分?jǐn)?shù)。如果你需要存儲(chǔ)大量的延遲任務(wù),可能會(huì)導(dǎo)致內(nèi)存消耗較大。這可能會(huì)對(duì)Redis服務(wù)器的性能和成本產(chǎn)生影響,特別是在大規(guī)模應(yīng)用中。
- 不適用于大規(guī)模延遲任務(wù):ZSET 可以處理相對(duì)較小數(shù)量的延遲任務(wù),但當(dāng)需要管理大規(guī)模延遲任務(wù)隊(duì)列時(shí),可能會(huì)導(dǎo)致性能下降。在這種情況下,需要考慮更高效的延遲隊(duì)列解決方案,例如使用分布式消息隊(duì)列。
- 無法動(dòng)態(tài)修改延遲時(shí)間: 一旦將任務(wù)添加到ZSET中,你不能輕松地修改任務(wù)的延遲時(shí)間。如果需要在任務(wù)已經(jīng)添加后更改延遲時(shí)間,可能需要復(fù)雜的操作。
- 沒有重試機(jī)制:ZSET 只能用于一次性延遲任務(wù),無法自動(dòng)處理任務(wù)失敗后的重試。如果任務(wù)在執(zhí)行時(shí)失敗,你需要自己實(shí)現(xiàn)重試邏輯。
- 沒有持久化: Redis是內(nèi)存數(shù)據(jù)庫,如果Redis服務(wù)器重啟或發(fā)生故障,已添加的延遲任務(wù)數(shù)據(jù)將丟失。雖然可以通過Redis持久化機(jī)制來部分解決這個(gè)問題,但仍然存在一定風(fēng)險(xiǎn)。
- 復(fù)雜性增加: 使用ZSET來管理延遲任務(wù)隊(duì)列需要編寫復(fù)雜的代碼來處理任務(wù)的添加、檢索和刪除。這可能增加應(yīng)用程序的復(fù)雜性。
Rabbitmq實(shí)現(xiàn)延遲隊(duì)列
死信,顧名思義就是無法被消費(fèi)的消息。一般來說,producer 將消息投遞到 broker 或者直接到queue 里了,consumer 從 queue 取出消息進(jìn)行消費(fèi),但某些時(shí)候由于特定的原因?qū)е聁ueu 中的某些消息無法被消費(fèi),這樣的消息如果沒有后續(xù)的處理,就變成了死信,有死信自然就有了死信隊(duì)列。
列出2種實(shí)現(xiàn)方式。
(1)使用Time To Live(TTL) + Dead Letter Exchanges(DLX)死信隊(duì)列組合實(shí)現(xiàn)延遲隊(duì)列的效果。
(2)使用RabbitMQ官方延遲插件rabbitmq_delayed_message_exchange,實(shí)現(xiàn)延時(shí)隊(duì)列效果。
由于TTL(生存時(shí)間)過期導(dǎo)致的死信,就是我們實(shí)現(xiàn)延遲隊(duì)列的的方式。
我們需要聲明如下形式的交互機(jī)和隊(duì)列,以及對(duì)應(yīng)的routing key,并進(jìn)行綁定:

上圖綁定的代碼如下所示
@Configuration
public class DeadQueueConfig {
//普通交換機(jī)及隊(duì)列
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
//死信交換機(jī)及隊(duì)列
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";
//通用隊(duì)列
public static final String QUEUE_C = "QC";
// 聲明 xExchange
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
//聲明隊(duì)列 A ttl 為 10s 并綁定到對(duì)應(yīng)的死信交換機(jī)
@Bean("queueA")
public Queue queueA() {
Map<String, Object> args = new HashMap<>(3);
//聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//聲明當(dāng)前隊(duì)列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//聲明隊(duì)列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
//聲明隊(duì)列A綁定X交換機(jī) 路由為XA
@Bean
public Binding queueABingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
//聲明隊(duì)列 B ttl 為 40s 并綁定到對(duì)應(yīng)的死信交換機(jī)
@Bean("queueB")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(3);
//聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//聲明當(dāng)前隊(duì)列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//聲明隊(duì)列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
//聲明隊(duì)列 B 綁定 X 交換機(jī)
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
}
//聲明通用隊(duì)列C 不設(shè)ttl,由消費(fèi)者決定ttl
@Bean("queueC")
public Queue queueC() {
Map<String, Object> args = new HashMap<>(3);
//聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//聲明當(dāng)前隊(duì)列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}
// 聲明隊(duì)列 C 綁定 X 交換機(jī)
@Bean
public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
// 聲明 死信隊(duì)列交換機(jī)
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//聲明死信隊(duì)列 QD
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETTER_QUEUE,true);
}
//聲明死信隊(duì)列 QD 綁定關(guān)系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
其中,QD為死信隊(duì)列。當(dāng)QA和QB隊(duì)列中的消息,達(dá)到設(shè)定的TTL(10s和40s)后,將進(jìn)入指定的死信隊(duì)列QD。該方法缺點(diǎn)就是一個(gè)TTL對(duì)應(yīng)一個(gè)隊(duì)列
其中的QC作為通用的隊(duì)列,即在消費(fèi)者處指定消息對(duì)應(yīng)的TTL,TTL過期后轉(zhuǎn)入死信隊(duì)列。使用該通用隊(duì)列可以避免每增加一個(gè)新的時(shí)間需求,就要新增一個(gè)隊(duì)列的問題。但該方法由于隊(duì)列先進(jìn)先出的性質(zhì),會(huì)導(dǎo)致一定的問題:
即先發(fā)出一個(gè)TTL為10s的消息a,進(jìn)入隊(duì)列;再馬上發(fā)出一個(gè)TTL為2s的消息b,進(jìn)入隊(duì)列。由于隊(duì)列的性質(zhì),會(huì)在消息a的TTL結(jié)束后,a進(jìn)入死信隊(duì)列后,b才會(huì)進(jìn)入死信隊(duì)列。而不是根據(jù)TTL的時(shí)間,b比a先進(jìn)入死信隊(duì)列。
聲明交換機(jī)、隊(duì)列,并綁定成功后,編寫死信隊(duì)列消費(fèi)者代碼;
@Component
@Slf4j
public class DeadQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("當(dāng)前時(shí)間:{},收到死信隊(duì)列信息:{}", new Date().toString(), msg);
}
}
在controller中編寫生產(chǎn)者代碼,進(jìn)行測(cè)試:
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public String sendMsg(@PathVariable String message){
log.info("當(dāng)前時(shí)間:{},發(fā)送一條信息給兩個(gè) TTL 隊(duì)列:{}", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息來自 ttl 為 10S 的隊(duì)列: " + message);
rabbitTemplate.convertAndSend("X", "XB", "消息來自 ttl 為 40S 的隊(duì)列: " + message);
return "finish";
}
結(jié)果如圖:

測(cè)試通用隊(duì)列QC的效果:
@GetMapping("/send/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
log.info("當(dāng)前時(shí)間:{},發(fā)送一條時(shí)長{}毫秒 TTL 信息給隊(duì)列 C:{}", new Date(), ttlTime, message);
}
結(jié)果如下圖

可以看到, 兩條消息幾乎同時(shí)到達(dá)死信隊(duì)列,因?yàn)門TL為2s的消息由于被堵在TTL為10s的消息后導(dǎo)致。
到此這篇關(guān)于redis和rabbitmq實(shí)現(xiàn)延時(shí)隊(duì)列的示例代碼的文章就介紹到這了,更多相關(guān)redis rabbitmq 延時(shí)隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis配置外網(wǎng)可訪問(redis遠(yuǎn)程連接不上)的方法
默認(rèn)情況下,當(dāng)我們?cè)诓渴鹆藃edis服務(wù)之后,redis本身默認(rèn)只允許本地訪問。Redis服務(wù)端只允許它所在服務(wù)器上的客戶端訪問,如果Redis服務(wù)端和Redis客戶端不在同一個(gè)機(jī)器上,就要進(jìn)行配置。2022-12-12
詳解Redis BoundValueOperations使用及實(shí)現(xiàn)
BoundValueOperations是Spring Data Redis中一個(gè)非常實(shí)用的工具,本文主要介紹了Redis BoundValueOperations使用,具有一定的參考價(jià)值,感興趣的可以了解一下2025-09-09
Redis結(jié)合 Docker 搭建集群并整合SpringBoot的詳細(xì)過程
這篇文章主要介紹了Redis結(jié)合Docker搭建集群并整合SpringBoot的詳細(xì)過程,本文給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-06-06
Redis實(shí)現(xiàn)分布式Session管理的機(jī)制詳解
這篇文章主要介紹了Redis實(shí)現(xiàn)分布式Session管理的機(jī)制詳解,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-01-01
通過redis的腳本lua如何實(shí)現(xiàn)搶紅包功能
這篇文章主要給大家介紹了關(guān)于通過redis的腳本lua如何實(shí)現(xiàn)搶紅包功能的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05
Redis可視化工具Redis?Desktop?Manager的具體使用
本文主要介紹了Redis可視化工具Redis?Desktop?Manager的具體使用,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12
Redis 中的熱點(diǎn)鍵和數(shù)據(jù)傾斜示例詳解
熱點(diǎn)鍵是指在 Redis 中被頻繁訪問的特定鍵,這些鍵由于其高訪問頻率,可能導(dǎo)致 Redis 服務(wù)器的性能問題,尤其是在高并發(fā)場(chǎng)景下,本文給大家介紹Redis 中的熱點(diǎn)鍵和數(shù)據(jù)傾斜,感興趣的朋友一起看看吧2025-03-03
Redis Subscribe timeout 報(bào)錯(cuò)的問題解決
最近系統(tǒng)偶爾報(bào)出org.redisson.client.RedisTimeoutException: Subscribe timeout: (7500ms)的錯(cuò)誤,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2025-08-08

