Redis簡易延時(shí)隊(duì)列的實(shí)現(xiàn)示例
一、背景
在實(shí)際的業(yè)務(wù)場景中,經(jīng)常會遇到需要延時(shí)處理的業(yè)務(wù),比如訂單超時(shí)未支付,需要取消訂單,或者是用戶注冊后,需要在一段時(shí)間內(nèi)激活賬號,否則賬號失效等等。這些業(yè)務(wù)場景都可以通過延時(shí)隊(duì)列來實(shí)現(xiàn)。
最近在實(shí)際業(yè)務(wù)當(dāng)中就遇到了這樣的一個(gè)場景,需要實(shí)現(xiàn)一個(gè)延時(shí)隊(duì)列,用來處理訂單超時(shí)未支付的業(yè)務(wù)。在網(wǎng)上找了一些資料,發(fā)現(xiàn)大部分都是使用了mq來實(shí)現(xiàn),比如rabbitmq,rocketmq等等,但是這些mq都是需要安裝的,而且還需要配置,對于此項(xiàng)目來說不想增加額外的依賴,所以就想到了使用redis來實(shí)現(xiàn)一個(gè)簡易的延時(shí)隊(duì)列。
二、實(shí)現(xiàn)思路
1. 業(yè)務(wù)場景
訂單超時(shí)未支付,需要取消訂單,這個(gè)業(yè)務(wù)場景可以分為兩個(gè)步驟來實(shí)現(xiàn):
- 用戶下單后,將訂單信息存入數(shù)據(jù)庫,并將訂單信息存入延時(shí)隊(duì)列中,設(shè)置延時(shí)時(shí)間為30分鐘。
- 30分鐘后,從延時(shí)隊(duì)列中取出訂單信息,判斷訂單是否已支付,如果未支付,則取消訂單。
- 如果用戶在30分鐘內(nèi)支付了訂單,則將訂單從延時(shí)隊(duì)列中刪除。
2. 實(shí)現(xiàn)思路
- 使用redis的zset來實(shí)現(xiàn)延時(shí)隊(duì)列,zset的score用來存儲訂單的超時(shí)時(shí)間,value用來存儲訂單信息。
- 使用redis的set來存儲已支付的訂單,set中的value為訂單id。
三、實(shí)現(xiàn)代碼
1. 使用了兩個(gè)注解類分別標(biāo)記生產(chǎn)者類、生產(chǎn)者方法,消費(fèi)者方法
/** * @program: * @description: redis延時(shí)隊(duì)列生產(chǎn)者類注解,標(biāo)記生產(chǎn)者類,用來掃描生產(chǎn)者類中的生產(chǎn)者方法,將生產(chǎn)者方法注冊到redis延時(shí)隊(duì)列中 * @author: jiangchengxuan * @created: 2023/12/09 10:32 */ @Component @Documented @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface RedisMessageQueue {}
/** * @program: * @description: * 帶有此注解的方法,方法的入?yún)⑹紫葧晦D(zhuǎn)換為json字符串,然后存入redis的zset中,score為當(dāng)前時(shí)間+延時(shí)時(shí)間,value為json字符串 * 當(dāng)延時(shí)時(shí)間到達(dá)后,會從redis的zset中取出value,然后將value轉(zhuǎn)換為入?yún)㈩愋?,調(diào)用此方法,執(zhí)行業(yè)務(wù)邏輯 * 此注解只能標(biāo)記在方法上,且方法必須為public,且只能有一個(gè)參數(shù) * 此注解標(biāo)記的方法,必須在redis延時(shí)隊(duì)列生產(chǎn)者類中,否則不會生效 * @author: jiangchengxuan * @created: 2023/12/09 10:37 */ @Documented @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface RedisMessageQueueMethod { String threadName() default "redis消息隊(duì)列默認(rèn)線程"; String queueKey(); // 隊(duì)列key值 int threadNum() default 1; //默認(rèn)線程數(shù)量 int threadSleepTime() default 500; //默認(rèn)線程休眠時(shí)間默認(rèn)500ms }
2. 生產(chǎn)者類具體實(shí)現(xiàn)
/** * @program: * @description: 生產(chǎn)者類具體實(shí)現(xiàn) * @author: jiangchengxuan * @created: 2023/12/09 10:44 */ @Slf4j @Component public class DelayQueueWorkerConfig implements InitializingBean { private volatile boolean monitorStarted = false; private volatile boolean monitorShutDowned = false; private ExecutorService executorService; // 需要監(jiān)控的延時(shí)隊(duì)列 @Autowired protected IDelayQueue<String> monitorQueue; @Autowired private ApplicationContext applicationContext; @Override public void afterPropertiesSet(){ //spring工具類,可以獲取指定注解的類 Map<String, Object> allNeedClass = applicationContext.getBeansWithAnnotation(RedisMessageQueue.class); for (Map.Entry<String, Object> entry : allNeedClass.entrySet()) { Object bean = entry.getValue(); Method[] methods = bean.getClass().getMethods(); for (Method method : methods) { Annotation[] annotations = method.getDeclaredAnnotations(); for (Annotation annotation : annotations) { if (annotation instanceof RedisMessageQueueMethod) { RedisMessageQueueMethod queueMethod = (RedisMessageQueueMethod) annotation; //找的需要使用消息隊(duì)列的方法后, initExecuteQueue(queueMethod, method, bean); } } } } } /** * 初始化執(zhí)行造作 * @param queueAnnotations 注解 * @param method 方法 * @param bean 對象 */ void initExecuteQueue(RedisMessageQueueMethod queueAnnotations ,Method method,Object bean) { String threadName = queueAnnotations.threadName(); int threadNum = queueAnnotations.threadNum(); int threadSheepTime = queueAnnotations.threadSleepTime(); String queueKey = queueAnnotations.queueKey(); //獲取所有消息隊(duì)列名稱 executorService = Executors.newFixedThreadPool(threadNum); for (int i = 0; i < threadNum; i++) { final int num = i; executorService.execute(() -> { Thread.currentThread().setName(threadName + "[" + num + "]"); //如果沒有設(shè)置隊(duì)列queuekey或者已經(jīng)暫停則不執(zhí)行 while (!monitorShutDowned) { String value = null; try { value = monitorQueue.get(queueKey); // 獲取數(shù)據(jù)時(shí)進(jìn)行刪除操作,刪除成功,則進(jìn)行處理,業(yè)務(wù)邏輯處理失敗則繼續(xù)添加回隊(duì)列但是時(shí)間設(shè)置最大以達(dá)到保存現(xiàn)場的目的,防止并發(fā)獲取重復(fù)數(shù)據(jù) if (StringUtils.isNotEmpty(value)) { if (log.isDebugEnabled()) { log.debug("Monitor Thread[" + Thread.currentThread().getName() + "], get from queue,value = {}", value); } boolean success = (Boolean) method.invoke(bean, value); // 失敗重試 if (!success) { success = (Boolean) method.invoke(bean, value);; if (!success) { log.warn("Monitor Thread[" + Thread.currentThread().getName() + "] execute Failed,value = {}", value); monitorQueue.add(TimeUnit.DAYS,365, value, queueKey); } } else { if (log.isDebugEnabled()) { log.debug("Monitor Thread[" + Thread.currentThread().getName() + "]:execute successfully!values = {}", value); } } } else { if (log.isDebugEnabled()) { log.debug("Monitor Thread[" + Thread.currentThread().getName() + "]:monitorThreadRunning = {}", monitorStarted); } Thread.sleep(threadSheepTime); } } catch (Exception e) { log.error("Monitor Thread[" + Thread.currentThread().getName() + "] execute Failed,value = " + value, e); } } log.info("Monitor Thread[" + Thread.currentThread().getName() + "] Completed..."); }); } log.info("thread pool is started..."); } }
/** * @program: * @description: * 延時(shí)隊(duì)列接口實(shí)現(xiàn)類, * 使用redis的zset實(shí)現(xiàn)延時(shí)隊(duì)列, * @author: jiangchengxuan * @created: 2023/12/09 23:34 */ public interface IDelayQueue <E> { /** * 向延時(shí)隊(duì)列中添加數(shù)據(jù) * * @param score 分?jǐn)?shù) * @param data 數(shù)據(jù) * @return true 成功 false 失敗 */ boolean add(long score, E data,String queueKey); /** * 向延時(shí)隊(duì)列中添加數(shù)據(jù) * * @param timeUnit 時(shí)間單位 * @param time 延后時(shí)間 * @param data 數(shù)據(jù) * @param queueKey * @return true 成功 false 失敗 */ boolean add(TimeUnit timeUnit, long time, E data, String queueKey); /** * 從延時(shí)隊(duì)列中獲取數(shù)據(jù) * @param queueKey 隊(duì)列key * @return 數(shù)據(jù) */ String get(String queueKey); /** * 刪除數(shù)據(jù) * * @param key * @param data 數(shù)據(jù) * @return */ public<T> boolean rem(String key, T data) ; }
/** * @program: * @description: redis操作類,封裝了redis的操作方法,使用時(shí)直接注入即可使用,不需要關(guān)心redis的操作細(xì)節(jié),使用時(shí)只需要關(guān)心業(yè)務(wù)邏輯即可 * @author: jiangchengxuan * @created: 2023/12/09 23:35 */ @Service public class RedisDelayQueue implements IDelayQueue<String> { @Autowired private RedisService redisService; @Override public boolean add(long score, String data,String queueKey) { return redisService.opsForZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+queueKey, data, score); } @Override public boolean add(TimeUnit timeUnit, long time, String data, String queueKey) { switch (timeUnit) { case SECONDS: return add(LocalDateTime.now().plusSeconds(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data, queueKey); case MINUTES: return add(LocalDateTime.now().plusMinutes(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey); case HOURS: return add(LocalDateTime.now().plusHours(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey); case DAYS: return add(LocalDateTime.now().plusDays(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey); default: return false; } } @Override public String get(String queueKey) { long now = System.currentTimeMillis(); long min = Long.MIN_VALUE; Set<String> res = redisService.rangeByScoreZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+queueKey, min, now, 0, 10); if (!CollectionUtils.isEmpty(res)) { for (String data : res){ // 刪除成功,則進(jìn)行處理,防止并發(fā)獲取重復(fù)數(shù)據(jù) if (rem(queueKey, data)){ return data; } } } return null; } @Override public<T> boolean rem(String key, T data) { return redisService.remZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+key, data); } }
- 使用
@RedisMessageQueue public class SomethingClass { @Autowired private IDelayQueue<String> messageQueue; /** * 生產(chǎn)者,向隊(duì)列中添加數(shù)據(jù),30秒后消費(fèi)者進(jìn)行消費(fèi) */ public void test(){ messageQueue.add(TimeUnit.SECONDS,30L,"這是參數(shù)數(shù)據(jù)","new_queue"); } /** * 消費(fèi)者,如果按此配置的話,會啟動一個(gè)線程,線程名稱為:測試線程名稱,線程數(shù)量為1,線程休眠時(shí)間為10毫秒 * 注意:queueKey需要與生產(chǎn)者中的queueKey保持一致才能進(jìn)行消費(fèi) * @param data */ @Override @RedisMessageQueueMethod(threadName = "測試線程名稱",queueKey = "new_queue",threadNum = 1,threadSleepTime = 10) public void testMethod(String data) { //do something } }
到此這篇關(guān)于Redis簡易延時(shí)隊(duì)列的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)Redis 延時(shí)隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- redis實(shí)現(xiàn)延時(shí)隊(duì)列的兩種方式(小結(jié))
- 基于Redis實(shí)現(xiàn)延時(shí)隊(duì)列的優(yōu)化方案小結(jié)
- 生產(chǎn)redisson延時(shí)隊(duì)列不消費(fèi)問題排查解決
- Redisson 分布式延時(shí)隊(duì)列 RedissonDelayedQueue 運(yùn)行流程
- redis使用zset實(shí)現(xiàn)延時(shí)隊(duì)列的示例代碼
- redis實(shí)現(xiàn)分布式延時(shí)隊(duì)列的示例代碼
- Redis消息隊(duì)列、阻塞隊(duì)列、延時(shí)隊(duì)列的實(shí)現(xiàn)
- Redisson延時(shí)隊(duì)列RedissonDelayed的具體使用
- redis和rabbitmq實(shí)現(xiàn)延時(shí)隊(duì)列的示例代碼
相關(guān)文章
Redisson分布式限流的實(shí)現(xiàn)原理解析
這篇文章主要為大家介紹了Redisson分布式限流的實(shí)現(xiàn)原理解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02Redis數(shù)據(jù)過期策略的實(shí)現(xiàn)詳解
最近項(xiàng)目當(dāng)中遇到一個(gè)需求場景,需要清空一些存放在Redis的數(shù)據(jù),本文對Redis的過期機(jī)制簡單的講解一下,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-09-09redis延時(shí)隊(duì)列的項(xiàng)目實(shí)踐
本文主要介紹了redis延時(shí)隊(duì)列的項(xiàng)目實(shí)踐,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-11-11Redis實(shí)現(xiàn)限量優(yōu)惠券的秒殺功能
文章詳細(xì)分析了避免超賣問題的方法,包括確保一人一單的業(yè)務(wù)邏輯,并提供了代碼實(shí)現(xiàn)步驟和代碼示例,感興趣的朋友跟隨小編一起看看吧2024-12-12基于Redis的List實(shí)現(xiàn)特價(jià)商品列表功能
本文通過場景分析給大家介紹了基于Redis的List實(shí)現(xiàn)特價(jià)商品列表,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2021-08-08利用redisson快速實(shí)現(xiàn)自定義限流注解(接口防刷)
利用redis的有序集合即Sorted?Set數(shù)據(jù)結(jié)構(gòu),構(gòu)造一個(gè)令牌桶來實(shí)施限流,而redisson已經(jīng)幫我們封裝成了RRateLimiter,通過redisson,即可快速實(shí)現(xiàn)我們的目標(biāo),這篇文章主要介紹了利用redisson快速實(shí)現(xiàn)自定義限流注解,需要的朋友可以參考下2024-07-07