redis和rabbitmq實現(xiàn)延時隊列的示例代碼
延遲隊列使用場景
1. 訂單超時處理:延遲隊列可以用于處理訂單超時問題。當(dāng)用戶下單后,將訂單信息放入延遲隊列,并設(shè)置一定的超時時間。如果在超時時間內(nèi)用戶未支付訂單,消費者會從延遲隊列中獲取到該訂單,并執(zhí)行相應(yīng)的處理操作,如取消訂單、釋放庫存等。
2. 優(yōu)惠券過期提醒:延遲隊列可以用于優(yōu)惠券的過期提醒功能。將即將過期的優(yōu)惠券信息放入延遲隊列,并設(shè)置合適的延遲時間。當(dāng)延遲時間到達時,消費者將提醒用戶優(yōu)惠券即將過期,引導(dǎo)用戶盡快使用。
3. 異步通知與提醒:延遲隊列可以用于異步通知和提醒功能。例如,當(dāng)用戶完成某個操作后,系統(tǒng)可以將相關(guān)通知消息放入延遲隊列,并設(shè)置一定的延遲時間,以便在合適的時機發(fā)送通知給用戶。
Redis中zset實現(xiàn)延時隊列
1. 創(chuàng)建延遲隊列服務(wù)類
- 創(chuàng)建一個延遲隊列的服務(wù)類,例如DelayQueueService,用于操作Redis中的ZSet。這個服務(wù)類需要完成以下功能:
- 將消息放入延遲隊列:將消息作為元素添加到ZSet中,設(shè)置對應(yīng)的延遲時間作為分數(shù)。輪詢并處理已到期的消息:定時任務(wù)或者消息消費者輪詢檢查ZSet中的元素,獲取到達指定時間的消息進行處理。刪除已處理的消息:處理完消息后,從ZSet中將其刪除。
@Service public class DelayQueueService { private static final String DELAY_QUEUE_KEY = "delay_queue"; @Autowired private RedisTemplate<String, String> redisTemplate; public void addToDelayQueue(String message,long delayTime){ redisTemplate.opsForZSet().add(DELAY_QUEUE_KEY,message,System.currentTimeMillis()+delayTime); } public void processDelayedMessage(){ //reverseRangeByScore 從高到低 //rangeByScore 從低到高 Set<String> messages = redisTemplate.opsForZSet().rangeByScore(DELAY_QUEUE_KEY, 0, System.currentTimeMillis()); for(String message:messages){ //處理消息 System.out.println(message); redisTemplate.opsForZSet().remove(DELAY_QUEUE_KEY,message); } } }
2. 配置定時任務(wù)或消息消費者
使用Spring Boot的定時任務(wù)或消息隊列框架,定時調(diào)用延遲隊列服務(wù)類的輪詢方法或監(jiān)聽指定的消息隊列,可以將輪訓(xùn)粒度放到1s一次。
@Component public class DelayQueueSchedule { @Autowired private DelayQueueService delayQueueService; // 每隔一段時間進行輪詢并處理延遲消息 @Scheduled(fixedDelay = 1000) public void pollAndProcessDelayedMessages() { delayQueueService.pollAndProcessDelayedMessages(); } }
然后在啟動類上通過@EnableScheduling注解開啟任務(wù)調(diào)度能力。
缺點:使用ZSET(有序集合,Sorted Set)來實現(xiàn)延遲任務(wù)調(diào)度(如訂單超時取消)是一種有效的方法,但它也有一些缺點和限制:
- 內(nèi)存消耗:ZSET 在Redis中是一個有序集合,它需要占用一定的內(nèi)存來存儲成員和分數(shù)。如果你需要存儲大量的延遲任務(wù),可能會導(dǎo)致內(nèi)存消耗較大。這可能會對Redis服務(wù)器的性能和成本產(chǎn)生影響,特別是在大規(guī)模應(yīng)用中。
- 不適用于大規(guī)模延遲任務(wù):ZSET 可以處理相對較小數(shù)量的延遲任務(wù),但當(dāng)需要管理大規(guī)模延遲任務(wù)隊列時,可能會導(dǎo)致性能下降。在這種情況下,需要考慮更高效的延遲隊列解決方案,例如使用分布式消息隊列。
- 無法動態(tài)修改延遲時間: 一旦將任務(wù)添加到ZSET中,你不能輕松地修改任務(wù)的延遲時間。如果需要在任務(wù)已經(jīng)添加后更改延遲時間,可能需要復(fù)雜的操作。
- 沒有重試機制:ZSET 只能用于一次性延遲任務(wù),無法自動處理任務(wù)失敗后的重試。如果任務(wù)在執(zhí)行時失敗,你需要自己實現(xiàn)重試邏輯。
- 沒有持久化: Redis是內(nèi)存數(shù)據(jù)庫,如果Redis服務(wù)器重啟或發(fā)生故障,已添加的延遲任務(wù)數(shù)據(jù)將丟失。雖然可以通過Redis持久化機制來部分解決這個問題,但仍然存在一定風(fēng)險。
- 復(fù)雜性增加: 使用ZSET來管理延遲任務(wù)隊列需要編寫復(fù)雜的代碼來處理任務(wù)的添加、檢索和刪除。這可能增加應(yīng)用程序的復(fù)雜性。
Rabbitmq實現(xiàn)延遲隊列
死信,顧名思義就是無法被消費的消息。一般來說,producer 將消息投遞到 broker 或者直接到queue 里了,consumer 從 queue 取出消息進行消費,但某些時候由于特定的原因?qū)е聁ueu 中的某些消息無法被消費,這樣的消息如果沒有后續(xù)的處理,就變成了死信,有死信自然就有了死信隊列。
列出2種實現(xiàn)方式。
(1)使用Time To Live(TTL) + Dead Letter Exchanges(DLX)死信隊列組合實現(xiàn)延遲隊列的效果。
(2)使用RabbitMQ官方延遲插件rabbitmq_delayed_message_exchange,實現(xiàn)延時隊列效果。
由于TTL(生存時間)過期導(dǎo)致的死信,就是我們實現(xiàn)延遲隊列的的方式。
我們需要聲明如下形式的交互機和隊列,以及對應(yīng)的routing key,并進行綁定:
上圖綁定的代碼如下所示
@Configuration public class DeadQueueConfig { //普通交換機及隊列 public static final String X_EXCHANGE = "X"; public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; //死信交換機及隊列 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String DEAD_LETTER_QUEUE = "QD"; //通用隊列 public static final String QUEUE_C = "QC"; // 聲明 xExchange @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } //聲明隊列 A ttl 為 10s 并綁定到對應(yīng)的死信交換機 @Bean("queueA") public Queue queueA() { Map<String, Object> args = new HashMap<>(3); //聲明當(dāng)前隊列綁定的死信交換機 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //聲明當(dāng)前隊列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //聲明隊列的 TTL args.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_A).withArguments(args).build(); } //聲明隊列A綁定X交換機 路由為XA @Bean public Binding queueABingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } //聲明隊列 B ttl 為 40s 并綁定到對應(yīng)的死信交換機 @Bean("queueB") public Queue queueB() { Map<String, Object> args = new HashMap<>(3); //聲明當(dāng)前隊列綁定的死信交換機 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //聲明當(dāng)前隊列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //聲明隊列的 TTL args.put("x-message-ttl", 40000); return QueueBuilder.durable(QUEUE_B).withArguments(args).build(); } //聲明隊列 B 綁定 X 交換機 @Bean public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queue1B).to(xExchange).with("XB"); } //聲明通用隊列C 不設(shè)ttl,由消費者決定ttl @Bean("queueC") public Queue queueC() { Map<String, Object> args = new HashMap<>(3); //聲明當(dāng)前隊列綁定的死信交換機 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //聲明當(dāng)前隊列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); return QueueBuilder.durable(QUEUE_C).withArguments(args).build(); } // 聲明隊列 C 綁定 X 交換機 @Bean public Binding queuecBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueC).to(xExchange).with("XC"); } // 聲明 死信隊列交換機 @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } //聲明死信隊列 QD @Bean("queueD") public Queue queueD() { return new Queue(DEAD_LETTER_QUEUE,true); } //聲明死信隊列 QD 綁定關(guān)系 @Bean public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(queueD).to(yExchange).with("YD"); } }
其中,QD為死信隊列。當(dāng)QA和QB隊列中的消息,達到設(shè)定的TTL(10s和40s)后,將進入指定的死信隊列QD。該方法缺點就是一個TTL對應(yīng)一個隊列
其中的QC作為通用的隊列,即在消費者處指定消息對應(yīng)的TTL,TTL過期后轉(zhuǎn)入死信隊列。使用該通用隊列可以避免每增加一個新的時間需求,就要新增一個隊列的問題。但該方法由于隊列先進先出的性質(zhì),會導(dǎo)致一定的問題:
即先發(fā)出一個TTL為10s的消息a,進入隊列;再馬上發(fā)出一個TTL為2s的消息b,進入隊列。由于隊列的性質(zhì),會在消息a的TTL結(jié)束后,a進入死信隊列后,b才會進入死信隊列。而不是根據(jù)TTL的時間,b比a先進入死信隊列。
聲明交換機、隊列,并綁定成功后,編寫死信隊列消費者代碼;
@Component @Slf4j public class DeadQueueConsumer { @RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("當(dāng)前時間:{},收到死信隊列信息:{}", new Date().toString(), msg); } }
在controller中編寫生產(chǎn)者代碼,進行測試:
@Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{message}") public String sendMsg(@PathVariable String message){ log.info("當(dāng)前時間:{},發(fā)送一條信息給兩個 TTL 隊列:{}", new Date(), message); rabbitTemplate.convertAndSend("X", "XA", "消息來自 ttl 為 10S 的隊列: " + message); rabbitTemplate.convertAndSend("X", "XB", "消息來自 ttl 為 40S 的隊列: " + message); return "finish"; }
結(jié)果如圖:
測試通用隊列QC的效果:
@GetMapping("/send/{message}/{ttlTime}") public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) { rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> { correlationData.getMessageProperties().setExpiration(ttlTime); return correlationData; }); log.info("當(dāng)前時間:{},發(fā)送一條時長{}毫秒 TTL 信息給隊列 C:{}", new Date(), ttlTime, message); }
結(jié)果如下圖
可以看到, 兩條消息幾乎同時到達死信隊列,因為TTL為2s的消息由于被堵在TTL為10s的消息后導(dǎo)致。
到此這篇關(guān)于redis和rabbitmq實現(xiàn)延時隊列的示例代碼的文章就介紹到這了,更多相關(guān)redis rabbitmq 延時隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis遍歷海量數(shù)據(jù)的實現(xiàn)示例
本文主要介紹了 Redis遍歷海量數(shù)據(jù)的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2025-04-04Redis出現(xiàn)(error)NOAUTH?Authentication?required.報錯的解決辦法(秒懂!)
這篇文章主要給大家介紹了關(guān)于Redis出現(xiàn)(error)NOAUTH?Authentication?required.報錯的解決辦法,對于 這個錯誤這通常是因為Redis服務(wù)器需要密碼進行身份驗證,但客戶端沒有提供正確的身份驗證信息導(dǎo)致的,需要的朋友可以參考下2024-03-03利用Redis實現(xiàn)訪問次數(shù)限流的方法詳解
這篇文章主要給大家介紹了關(guān)于如何利用Redis實現(xiàn)訪問次數(shù)限流的相關(guān)資料,文中通過實例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2022-02-02