Spring Boot 項(xiàng)目集成 Redisson 實(shí)現(xiàn)延遲隊(duì)列的詳細(xì)過(guò)程
延遲隊(duì)列應(yīng)用場(chǎng)景
- 訂單支付超時(shí):用戶(hù)下單后30分鐘未支付,自動(dòng)取消訂單。
- 訂單評(píng)價(jià)超時(shí):訂單簽收后7天未評(píng)價(jià),系統(tǒng)默認(rèn)好評(píng)。
- 商家接單超時(shí):下單成功后商家5分鐘未接單,訂單取消。
- 配送超時(shí)提醒:配送超時(shí),推送短信提醒。
技術(shù)選型分析
針對(duì)延遲任務(wù)處理機(jī)制,主要可選方案有以下四種:定時(shí)輪詢(xún)、Redisson 延遲隊(duì)列、消息中間件、Redis 過(guò)期監(jiān)聽(tīng)。
1. 定時(shí)任務(wù)輪詢(xún)
機(jī)制: 通過(guò)定時(shí)任務(wù)如 @Scheduled
或 Quartz),以固定頻率輪詢(xún)數(shù)據(jù)庫(kù)或 Redis,查找已到期的任務(wù)并處理。
優(yōu)點(diǎn):由于springboot原生支持,實(shí)現(xiàn)成本低,并且可以任務(wù)統(tǒng)一管理
缺點(diǎn):
- 處理存在延遲非實(shí)時(shí),精度取決于輪詢(xún)間隔。
- 容易空輪詢(xún),浪費(fèi) CPU 或數(shù)據(jù)庫(kù)資源。
- 不適合高并發(fā)或?qū)r(shí)效要求高的業(yè)務(wù)場(chǎng)景。
適用場(chǎng)景:小型系統(tǒng)、任務(wù)量小、業(yè)務(wù)容忍較大延遲的場(chǎng)景。
2. Redisson 延遲隊(duì)列(推薦)
機(jī)制說(shuō)明:
基于 Redis ZSet 和 List 封裝的延遲隊(duì)列,由 Redisson 實(shí)現(xiàn),支持回調(diào)消費(fèi)。
優(yōu)點(diǎn):
- 接入簡(jiǎn)單,Redisson 封裝完備。
- 實(shí)時(shí)性較好,精度可達(dá)秒級(jí),滿(mǎn)足大多數(shù)業(yè)務(wù)需求。
- 可注冊(cè)不同處理器,業(yè)務(wù)擴(kuò)展方便。
- 支持分布式部署,天然適配 Redis 集群環(huán)境。
缺點(diǎn):實(shí)現(xiàn)依賴(lài) Redisson,同時(shí)需要保證Redis 崩潰等情況設(shè)計(jì)補(bǔ)償保護(hù)機(jī)制
適用場(chǎng)景:中大型系統(tǒng)、微服務(wù)架構(gòu)下的延遲任務(wù)處理。
3. 消息中間件延遲隊(duì)列( RabbitMQ、Kafka等)
機(jī)制:通過(guò)消息中間件的 TTL(消息生存時(shí)間)和死信隊(duì)列機(jī)制實(shí)現(xiàn)延遲任務(wù),例如 RabbitMQ 的 DLX(死信交換機(jī))或 Kafka 的延遲消費(fèi)。
優(yōu)點(diǎn):
- 毫秒級(jí)精度,適合高并發(fā)和對(duì)時(shí)效性要求高的業(yè)務(wù)。
- 高可靠性,天然支持異步解耦與分布式處理。
- 支持大規(guī)模任務(wù)并發(fā)調(diào)度。
缺點(diǎn):
- 配置復(fù)雜,需要配置 TTL、DLX 等。
- 引入 MQ 系統(tǒng),提升系統(tǒng)復(fù)雜度與維護(hù)成本。
- 對(duì)運(yùn)維能力有一定要求。
適用場(chǎng)景:高并發(fā)、高可用要求的核心業(yè)務(wù),如訂單超時(shí)關(guān)閉、促銷(xiāo)活動(dòng)控制等。
4. Redis Key 過(guò)期監(jiān)聽(tīng)(不推薦)
最初是通過(guò)redis的思路來(lái)實(shí)現(xiàn)延遲隊(duì)列功能,但是通過(guò)查詢(xún)資料和官方文檔發(fā)現(xiàn),redis并不適合此種場(chǎng)景
機(jī)制:
通過(guò)啟用 Redis 的 Keyspace Notification 功能,監(jiān)聽(tīng)鍵過(guò)期事件(需設(shè)置 notify-keyspace-events
配置項(xiàng))。
官方文檔說(shuō)明:
Redis 中 Key 的過(guò)期事件
expired
有兩種觸發(fā)方式:
- 在訪問(wèn) Key 時(shí)發(fā)現(xiàn)其已過(guò)期
- 后臺(tái)線程定期掃描并刪除過(guò)期 Key
因此,并不能保證在 TTL 恰好歸零時(shí)立即觸發(fā)過(guò)期事件,也不保證事件一定會(huì)觸發(fā)。
缺點(diǎn):
- 不可靠,事件觸發(fā)不精確,且可能丟失。
- 無(wú)法支持分布式監(jiān)聽(tīng),Redis 集群環(huán)境下存在局限。
- 對(duì)核心業(yè)務(wù)流程不具備可控性。
適用場(chǎng)景:臨時(shí)性、非強(qiáng)一致性場(chǎng)景,如驗(yàn)證碼、狀態(tài)標(biāo)記清理等。
參考:
選型建議總結(jié)
方案 | 實(shí)現(xiàn)難度 | 實(shí)時(shí)性 | 可靠性 | 分布式支持 | 推薦場(chǎng)景 |
---|---|---|---|---|---|
定時(shí)任務(wù)輪詢(xún) | 低 | 低 | 中 | 有限 | 簡(jiǎn)單、低頻業(yè)務(wù) |
Redisson 延遲隊(duì)列 | 中 | 中 | 高 | 好 | 分布式、業(yè)務(wù)量中等、場(chǎng)景標(biāo)準(zhǔn) |
MQ 延遲隊(duì)列 | 高 | 高 | 高 | 極好 | 高并發(fā)、大量異步、核心任務(wù)場(chǎng)景 |
Redis 過(guò)期監(jiān)聽(tīng) | 低 | 不確定 | 低 | 差 | 非核心場(chǎng)景,緩存狀態(tài)變更類(lèi)任務(wù) |
Redisson 延遲隊(duì)列實(shí)現(xiàn)
項(xiàng)目結(jié)構(gòu)
├── config │ └── RedissonConfig.java # 配置 Redisson 客戶(hù)端,創(chuàng)建 RedissonClient Bean ├── controller │ └── DeliveryController.java # 提供REST 接口模擬訂單創(chuàng)建和收貨操作,觸發(fā)延遲任務(wù) ├── enums │ └── DelayQueueEnum.java # 定義延遲隊(duì)列的業(yè)務(wù)枚舉及其關(guān)聯(lián)的處理類(lèi) ├── hander │ ├── DelayQueueHandler.java # 延遲隊(duì)列處理器接口,定義 execute 方法 │ ├── EvaluationTimeoutHandler.java # 處理評(píng)價(jià)超時(shí)邏輯的具體實(shí)現(xiàn)類(lèi) │ └── OrderPaymentTimeoutHandler.java # 處理訂單支付超時(shí)邏輯的具體實(shí)現(xiàn)類(lèi) ├── runner │ └── RedisDelayQueueRunner.java # 啟動(dòng)后監(jiān)聽(tīng)并執(zhí)行延遲隊(duì)列任務(wù),使用線程池并發(fā)處理 ├── utils │ ├── RedisDelayQueueUtil.java # 封裝 Redis 延遲隊(duì)列的操作方法(添加/獲取元素) │ └── SpringUtils.java # 工具類(lèi),用于在非 Spring 管理類(lèi)中獲取 Bean └── SpringbootApplication.java # Spring Boot 主類(lèi),包含程序入口 main 方法
Redis延遲隊(duì)列工具類(lèi)
package com.zhou.demo.utils; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingDeque; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.concurrent.TimeUnit; /** * redis延遲隊(duì)列工具 */ @Slf4j @Component public class RedisDelayQueueUtil { @Resource private RedissonClient redissonClient; /** * 將元素添加到延遲隊(duì)列中 * * @param queueCode 隊(duì)列鍵(用于標(biāo)識(shí)不同的隊(duì)列) * @param value 要添加到隊(duì)列中的值(泛型類(lèi)型) * @param delay 延遲時(shí)間(指定元素在隊(duì)列中延遲被消費(fèi)的時(shí)間) * @param timeUnit 時(shí)間單位(與延遲時(shí)間配合使用,如秒、毫秒等) */ public <T> void addDelayQueue(String queueCode, T value, long delay, TimeUnit timeUnit) { try { RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); delayedQueue.offer(value, delay, timeUnit); log.info("(添加延時(shí)隊(duì)列成功) 隊(duì)列鍵:{},隊(duì)列值:{},延遲時(shí)間:{}秒", queueCode, value, timeUnit.toSeconds(delay)); } catch (Exception e) { log.error("(添加延時(shí)隊(duì)列失敗) {}", e.getMessage(), e); throw new RuntimeException("(添加延時(shí)隊(duì)列失敗)", e); } } /** * 獲取延遲隊(duì)列中的元素 * * @param queueCode 隊(duì)列鍵 * @param <T> 元素類(lèi)型 * @return 隊(duì)列中的元素 */ public <T> T getDelayQueue(String queueCode) throws InterruptedException { RBlockingDeque<T> blockingDeque = redissonClient.getBlockingDeque(queueCode); return blockingDeque.take(); } }
Redis延遲隊(duì)列運(yùn)行器
用于在Spring Boot啟動(dòng)后監(jiān)聽(tīng)各個(gè)延遲隊(duì)列,并在線程池中執(zhí)行對(duì)應(yīng)的業(yè)務(wù)邏輯。
package com.zhou.demo.runner; import com.zhou.demo.enums.DelayQueueEnum; import com.zhou.demo.hander.DelayQueueHandler; import com.zhou.demo.utils.RedisDelayQueueUtil; import com.zhou.demo.utils.SpringUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Redis延遲隊(duì)列運(yùn)行器 * 用于在Spring Boot啟動(dòng)后監(jiān)聽(tīng)各個(gè)延遲隊(duì)列,并在線程池中執(zhí)行對(duì)應(yīng)的業(yè)務(wù)邏輯。 * * @author zhouquan */ @Slf4j @Component public class RedisDelayQueueRunner implements CommandLineRunner { @Resource private RedisDelayQueueUtil redisDelayQueueUtil; /** * 線程池,用于并發(fā)監(jiān)聽(tīng)不同的延遲隊(duì)列 */ private final ExecutorService executorService = Executors.newCachedThreadPool(); /** * 運(yùn)行狀態(tài)標(biāo)志,控制線程是否持續(xù)監(jiān)聽(tīng)隊(duì)列 */ private volatile boolean running = true; /** * Spring Boot 啟動(dòng)完成后自動(dòng)運(yùn)行的方法 * 遍歷所有延遲隊(duì)列枚舉,為每個(gè)隊(duì)列創(chuàng)建一個(gè)監(jiān)聽(tīng)線程 * * @param args 命令行參數(shù) */ @Override public void run(String... args) { for (DelayQueueEnum queueEnum : DelayQueueEnum.values()) { executorService.execute(() -> { log.info("啟動(dòng)延遲隊(duì)列監(jiān)聽(tīng)線程:{}", queueEnum.getCode()); while (running) { try { Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode()); DelayQueueHandler handler = SpringUtils.getBean(queueEnum.getBeanClass()); handler.execute(value); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("線程中斷:{}", queueEnum.getCode()); } catch (Exception ex) { log.error("延遲隊(duì)列 [{}] 處理異常:{}", queueEnum.getCode(), ex.getMessage(), ex); } } }); } log.info("所有 Redis 延遲隊(duì)列監(jiān)聽(tīng)啟動(dòng)完成"); } /** * 在 Bean 銷(xiāo)毀前關(guān)閉線程池,釋放資源 */ @PreDestroy public void shutdown() { log.info("準(zhǔn)備關(guān)閉 Redis 延遲隊(duì)列監(jiān)聽(tīng)線程池"); running = false; executorService.shutdownNow(); } }
業(yè)務(wù)枚舉類(lèi)
package com.zhou.demo.enums; import com.zhou.demo.hander.EvaluationTimeoutHandler; import com.zhou.demo.hander.OrderPaymentTimeoutHandler; import com.zhou.demo.hander.DelayQueueHandler; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; /** * 延遲隊(duì)列業(yè)務(wù)枚舉 * * @author 18324 */ @Getter @NoArgsConstructor @AllArgsConstructor public enum DelayQueueEnum { /** * 訂單超時(shí) */ ORDER_PAYMENT_TIMEOUT("order_payment_timeout", "訂單支付超時(shí)", OrderPaymentTimeoutHandler.class), /** * 評(píng)價(jià)超時(shí) */ EVALUATION_TIMEOUT("evaluation_timeout", "評(píng)價(jià)超時(shí)", EvaluationTimeoutHandler.class); /** * 延遲隊(duì)列 Redis Key */ private String code; /** * 中文描述 */ private String name; /** * 延遲隊(duì)列具體業(yè)務(wù)實(shí)現(xiàn)的 Bean * 可通過(guò) Spring 的上下文獲取 */ private Class<? extends DelayQueueHandler<Long>> beanClass; }
測(cè)試接口類(lèi)
package com.zhou.demo.enums; import com.zhou.demo.hander.EvaluationTimeoutHandler; import com.zhou.demo.hander.OrderPaymentTimeoutHandler; import com.zhou.demo.hander.DelayQueueHandler; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; /** * 延遲隊(duì)列業(yè)務(wù)枚舉 * * @author 18324 */ @Getter @NoArgsConstructor @AllArgsConstructor public enum DelayQueueEnum { /** * 訂單超時(shí) */ ORDER_PAYMENT_TIMEOUT("order_payment_timeout", "訂單支付超時(shí)", OrderPaymentTimeoutHandler.class), /** * 評(píng)價(jià)超時(shí) */ EVALUATION_TIMEOUT("evaluation_timeout", "評(píng)價(jià)超時(shí)", EvaluationTimeoutHandler.class); /** * 延遲隊(duì)列 Redis Key */ private String code; /** * 中文描述 */ private String name; /** * 延遲隊(duì)列具體業(yè)務(wù)實(shí)現(xiàn)的 Bean * 可通過(guò) Spring 的上下文獲取 */ private Class<? extends DelayQueueHandler<Long>> beanClass; }
延遲隊(duì)列任務(wù)測(cè)試
源碼地址
https://gitee.com/zhouquanstudy/springboot-redisson-delayqueue
參考
[1]. SpringBoot集成Redisson實(shí)現(xiàn)延遲隊(duì)列_redisson delayedqueue
[2]. 請(qǐng)勿過(guò)度依賴(lài)Redis的過(guò)期監(jiān)聽(tīng)業(yè)務(wù)
到此這篇關(guān)于Spring Boot 項(xiàng)目集成 Redisson 實(shí)現(xiàn)延遲隊(duì)列的文章就介紹到這了,更多相關(guān)Spring Boot Redisson延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot中Redisson延遲隊(duì)列的示例
- SpringBoot集成Redisson實(shí)現(xiàn)延遲隊(duì)列的場(chǎng)景分析
- Springboot使用redisson?+?自定義注解實(shí)現(xiàn)消息的發(fā)布訂閱(解決方案)
- SpringBoot整合Redisson的兩種方式
- SpringBoot集成Redisson操作Redis的實(shí)現(xiàn)方法
- springboot集成redisson的三種方式
- Spring Boot使用Redisson實(shí)現(xiàn)滑動(dòng)窗口限流的項(xiàng)目實(shí)踐
- SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列的示例詳解
- Spring Boot與RabbitMQ結(jié)合實(shí)現(xiàn)延遲隊(duì)列的示例
相關(guān)文章
Java大數(shù)據(jù)開(kāi)發(fā)Hadoop?MapReduce
MapReduce的思想核心是“分而治之”,適用于大量復(fù)雜的任務(wù)處理場(chǎng)景(大規(guī)模數(shù)據(jù)處理場(chǎng)景)Map負(fù)責(zé)“分”,即把復(fù)雜的任務(wù)分解為若干個(gè)“簡(jiǎn)單的任務(wù)”來(lái)并行處理??梢赃M(jìn)行拆分的前提是這些小任務(wù)可以并行計(jì)算,彼此間幾乎沒(méi)有依賴(lài)關(guān)系2023-03-03springBoo3.0集成knife4j4.1.0的詳細(xì)教程(swagger3)
這篇文章主要介紹了springBoo3.0集成knife4j4.1.0的詳細(xì)教程(swagger3),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-07-07java開(kāi)發(fā)MVC三層架構(gòu)上再加一層Manager層原理詳解
這篇文章主要為大家介紹了MVC三層架構(gòu)中再加一層Manager層原理的示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2021-10-10Java工廠模式之簡(jiǎn)單工廠,工廠方法,抽象工廠模式詳解
這篇文章主要為大家詳細(xì)介紹了Java工廠模式之簡(jiǎn)單工廠、工廠方法、抽象工廠模式,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助2022-02-02Spring Boot集成springfox-swagger2構(gòu)建restful API的方法教程
這篇文章主要給大家介紹了關(guān)于Spring Boot集成springfox-swagger2構(gòu)建restful API的相關(guān)資料,文中介紹的非常詳細(xì),對(duì)大家具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面跟著小編一起來(lái)學(xué)習(xí)學(xué)習(xí)吧。2017-06-06mybatis條件構(gòu)造器(EntityWrapper)的使用方式
這篇文章主要介紹了mybatis條件構(gòu)造器(EntityWrapper)的使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03MyBatis-Plus中MetaObjectHandler沒(méi)生效完美解決
在進(jìn)行測(cè)試時(shí)發(fā)現(xiàn)配置的MyMetaObjectHandler并沒(méi)有生效,本文主要介紹了MyBatis-Plus中MetaObjectHandler沒(méi)生效完美解決,具有一定的參考價(jià)值,感興趣的可以了解一下2023-11-11Java Web實(shí)現(xiàn)自動(dòng)登陸功能
這篇文章主要為大家詳細(xì)介紹了Java Web實(shí)現(xiàn)自動(dòng)登陸功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08