Redis?延時(shí)任務(wù)實(shí)現(xiàn)及與定時(shí)任務(wù)區(qū)別詳解
引言
1. 生成訂單30分鐘未支付,則自動(dòng)取消
2. 30分鐘未回復(fù),則結(jié)束會(huì)話
對(duì)上述的任務(wù),我們給一個(gè)專業(yè)的名字來形容,那就是延時(shí)任務(wù)
一、延時(shí)任務(wù)是什么
延時(shí)任務(wù)
不同于一般的定時(shí)任務(wù),延時(shí)任務(wù)是在某事件觸發(fā)
后的未來某個(gè)時(shí)刻執(zhí)行,沒有重復(fù)的執(zhí)行周期。
二、延時(shí)任務(wù)和定時(shí)任務(wù)的區(qū)別是什么
- 定時(shí)任務(wù)有明確的觸發(fā)時(shí)間,延時(shí)任務(wù)沒有
- 定時(shí)任務(wù)有執(zhí)行周期,而延時(shí)任務(wù)在某事件觸發(fā)后一段時(shí)間內(nèi)執(zhí)行,沒有執(zhí)行周期
定時(shí)任務(wù)一般執(zhí)行的是批處理多個(gè)任務(wù),而延時(shí)任務(wù)一般是單任務(wù)處理
三、技術(shù)對(duì)比
本文主要講解Redis的Zset
實(shí)現(xiàn)延時(shí)任務(wù),其他方案只做介紹
1.數(shù)據(jù)庫輪詢
通過定時(shí)組件
的去掃描數(shù)據(jù)庫,通過時(shí)間來判斷是否有超時(shí)的訂單,然后進(jìn)行update或delete等操作
優(yōu)點(diǎn):
簡單易行
缺點(diǎn):
- 對(duì)服務(wù)器內(nèi)存消耗大
- 時(shí)間間隔小,數(shù)據(jù)庫損耗極大
- 數(shù)據(jù)內(nèi)存態(tài),不可靠
- 如果任務(wù)量過大,對(duì)數(shù)據(jù)庫造成的壓力很大 。頻繁查詢數(shù)據(jù)庫帶來性能影響
2.JDK的延遲隊(duì)列
利用JDK自帶的DelayQueue
來實(shí)現(xiàn),這是一個(gè)無界阻塞隊(duì)列,該隊(duì)列只有在延遲期滿的時(shí)候才能從中獲取元素,放入DelayQueue
中,是必須實(shí)現(xiàn)Delayed接口
的。
優(yōu)點(diǎn):實(shí)現(xiàn)簡單,效率高,任務(wù)觸發(fā)時(shí)間延遲低。
缺點(diǎn):
- 服務(wù)器重啟后,數(shù)據(jù)全部消失,怕宕機(jī)
- 因?yàn)閮?nèi)存條件限制的原因,比如下單未付款的訂單數(shù)太多,那么很容易就出現(xiàn)OOM異常
- 數(shù)據(jù)內(nèi)存態(tài),不可靠
3.時(shí)間輪算法
時(shí)間輪TimingWheel是一種高效、低延遲的調(diào)度數(shù)據(jù)結(jié)構(gòu),底層采用數(shù)組實(shí)現(xiàn)存儲(chǔ)任務(wù)列表的環(huán)形隊(duì)列,示意圖如下:時(shí)間輪
時(shí)間輪算法可以類比于時(shí)鐘,如上圖箭頭(指針)按某一個(gè)方向按固定頻率輪動(dòng),每一次跳動(dòng)稱為一個(gè) tick。這樣可以看出定時(shí)輪由個(gè)3個(gè)重要的屬數(shù),ticksPerWheel(一輪的tick數(shù)),tickDuration(一個(gè)tick的持續(xù)時(shí)間)以及 timeUnit(時(shí)間單位),例如當(dāng)ticksPerWheel=60,tickDuration=1,timeUnit=秒,這就和現(xiàn)實(shí)中的始終的秒針走動(dòng)完全類似了。
如果當(dāng)前指針指在1上面,我有一個(gè)任務(wù)需要4秒以后執(zhí)行,那么這個(gè)執(zhí)行的線程回調(diào)或者消息將會(huì)被放在5上。那如果需要在20秒之后執(zhí)行怎么辦,由于這個(gè)環(huán)形結(jié)構(gòu)槽數(shù)只到8,如果要20秒,指針需要多轉(zhuǎn)2圈。位置是在2圈之后的5上面(20 % 8 + 1)
優(yōu)點(diǎn):效率高,任務(wù)觸發(fā)時(shí)間延遲時(shí)間比delayQueue低
缺點(diǎn):
- 服務(wù)器重啟后,數(shù)據(jù)全部消失,怕宕機(jī)
- 容易就出現(xiàn)OOM異常
- 數(shù)據(jù)內(nèi)存態(tài),不可靠
4.使用消息隊(duì)列
使用RabbitMQ死信隊(duì)列依賴于RabbitMQ的兩個(gè)特性:TTL和DLX。
TTL:Time To Live,消息存活時(shí)間,包括兩個(gè)維度:隊(duì)列消息存活時(shí)間和消息本身的存活時(shí)間。
DLX:Dead Letter Exchange,死信交換器。
優(yōu)點(diǎn):異步交互可以削峰,高效,可以利用rabbitmq的分布式特性輕易的進(jìn)行橫向擴(kuò)展,消息支持持久化增加了可靠性。
缺點(diǎn):
1.本身的易用度要依賴于rabbitMq的運(yùn)維.因?yàn)橐胷abbitMq,所以復(fù)雜度和成本變高
2.RabbitMq是一個(gè)消息中間件;延遲隊(duì)列只是其中一個(gè)小功能,如果團(tuán)隊(duì)技術(shù)棧中本來就是使用RabbitMq那還好,如果不是,那為了使用延遲隊(duì)列而去部署一套R(shí)abbitMq成本有點(diǎn)大;
5.Redis的Zset實(shí)現(xiàn)延時(shí)任務(wù)
為什么采用Redis的ZSet實(shí)現(xiàn)延遲任務(wù)?
zset數(shù)據(jù)類型的去重有序(分?jǐn)?shù)排序)特點(diǎn)進(jìn)行延遲。例如:時(shí)間戳作為score進(jìn)行排序
5.1 思路分析
- 項(xiàng)目啟動(dòng)時(shí)啟用
一條線程
,線程用于間隔一定時(shí)間去查詢r(jià)edis的待執(zhí)行任務(wù)。其任務(wù)jobId為業(yè)務(wù)id,值為要執(zhí)行的時(shí)間。 - 查詢到執(zhí)行的任務(wù)時(shí),將其從redis的信息中進(jìn)行刪除。(
刪除成功才執(zhí)行延時(shí)任務(wù),否則不執(zhí)行,這樣可以避免分布式系統(tǒng)延時(shí)任務(wù)多次執(zhí)行
。) - 刪除redis中的記錄之后,執(zhí)行任務(wù)。將執(zhí)行jobId也就是業(yè)務(wù)id對(duì)應(yīng)的任務(wù)。
實(shí)際場景中,還會(huì)涉及延時(shí)任務(wù)修改,刪除等,這些場景可以指定標(biāo)記,修改標(biāo)識(shí)即可,當(dāng)然也可以在業(yè)務(wù)邏輯中做補(bǔ)充條件的判斷。
5.2 Redis中Zset的簡單介紹及使用
Redis 有序集合是 string 類型元素的集合,且不允許重復(fù)的成員。每個(gè)元素都會(huì)關(guān)聯(lián)一個(gè) double 類型的分?jǐn)?shù)。redis 正是通過分?jǐn)?shù)來為集合中的成員進(jìn)行從小到大的排序。有序集合的成員是唯一的,但分?jǐn)?shù)(score)卻可以重復(fù)。
常用命令
- ZADD命令 : 將一個(gè)或多個(gè)成員元素及其分?jǐn)?shù)值加入到有序集當(dāng)中,或者更新已存在成員的分?jǐn)?shù)
- ZCARD命令 : 獲取有序集合的成員數(shù)
- ZRANGEBYSCORE: 通過分?jǐn)?shù)返回有序集合指定區(qū)間內(nèi)的成員
- ZREM : 移除有序集合中的一個(gè)或多個(gè)成員
java中操作簡單介紹
1.add(K key, V value, double score) 添加元素到變量中同時(shí)指定元素的分值。 redisTemplate.opsForZSet().add("zSetValue","A",1); 2.rangeByScore(K key, double min, double max) 根據(jù)設(shè)置的score獲取區(qū)間值。 zSetValue = redisTemplate.opsForZSet().rangeByScore("zSetValue",1,2); 3.rangeByScore(K key, double min, double max,long offset, long count) 根據(jù)設(shè)置的score獲取區(qū)間值從給定下標(biāo)和給定長度獲取最終值。 zSetValue = redisTemplate.opsForZSet().rangeByScore("zSetValue",1,5,1,3); 4.rangeWithScores(K key, long start, long end) 獲取RedisZSetCommands.Tuples的區(qū)間值。 Set<ZSetOperations.TypedTuple<Object>> typedTupleSet = redisTemplate.opsForZSet().rangeWithScores("typedTupleSet",1,3); Iterator<ZSetOperations.TypedTuple<Object>> iterator = typedTupleSet.iterator(); while (iterator.hasNext()){ ZSetOperations.TypedTuple<Object> typedTuple = iterator.next(); Object value = typedTuple.getValue(); double score = typedTuple.getScore(); } 5.刪除成員 redisTemplate.opsForZSet().remove("myZset","a","b");
以下代碼可以直接使用-基于Spring Boot項(xiàng)目
5.3 延時(shí)隊(duì)列工廠
代碼中注釋有詳細(xì)介紹
/** * 延時(shí)隊(duì)列工廠 * **/ @Slf4j public abstract class AbstractDelayQueueMachineFactory { @Autowired private RedisUtil redisUtil; @Autowired private ThreadPoolTaskExecutor asyncTaskExecutor; /** * 插入任務(wù)id * * @param jobId 任務(wù)id(隊(duì)列內(nèi)唯一) * @param time 延時(shí)時(shí)間(單位 :毫秒) * @return 是否插入成功 */ public boolean addJob(String jobId, Integer time) { Calendar instance = Calendar.getInstance(); //增加延時(shí)時(shí)間,獲取最終觸發(fā)時(shí)間 instance.add(Calendar.MILLISECOND, time); long delayMillisecond = instance.getTimeInMillis(); log.info("延時(shí)隊(duì)列添加問題{}",jobId); return redisUtil.zAdd(setDelayQueueName(), delayMillisecond, jobId); } /** * 刪除任務(wù)id * * @param jobId 任務(wù)id(隊(duì)列內(nèi)唯一) */ public boolean removeJob(String jobId) { Long num = redisUtil.zRemove(setDelayQueueName(), jobId); if (num > 0) return true; return false; } /** * 延時(shí)隊(duì)列機(jī)器開始運(yùn)作 */ private void startDelayQueueMachine() { log.info("延時(shí)隊(duì)列{}開始啟動(dòng)", setDelayQueueName()); // 監(jiān)聽redis隊(duì)列 while (true) { try { // 獲取當(dāng)前時(shí)間前的任務(wù)列表 Set<ZSetOperations.TypedTuple<Object>> tuples = redisUtil.zRangeByScore(setDelayQueueName(), 0, System.currentTimeMillis() ); // 如果任務(wù)不為空 if (!CollectionUtils.isEmpty(tuples)) { log.info("延時(shí)任務(wù)開始執(zhí)行:{}", JSONUtil.toJsonStr(tuples)); Iterator<ZSetOperations.TypedTuple<Object>> iterator = tuples.iterator(); while (iterator.hasNext()){ ZSetOperations.TypedTuple<Object> typedTuple = iterator.next(); String questionId = Convert.toStr(typedTuple.getValue()); // 移除緩存,如果移除成功則表示當(dāng)前線程處理了延時(shí)任務(wù),則執(zhí)行延時(shí)任務(wù) // 刪除成功才執(zhí)行延時(shí)任務(wù),否則不執(zhí)行,這樣可以避免分布式系統(tǒng)延時(shí)任務(wù)多次執(zhí)行 Long num = redisUtil.zRemove(setDelayQueueName(), questionId); // 如果移除成功, 則執(zhí)行 if (num > 0) { asyncTaskExecutor.execute(() -> invoke(questionId)); } } } } catch (Exception e) { log.error("處理延時(shí)任務(wù)發(fā)生異常,異常原因?yàn)閧}", e.getMessage(), e); } finally { // 間隔()分鐘執(zhí)行一次 //根據(jù)業(yè)務(wù)場景設(shè)置對(duì)應(yīng)時(shí)間 try { TimeUnit.MINUTES.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * 最終執(zhí)行的任務(wù)方法 * * @param jobId 任務(wù)id */ public abstract void invoke(String jobId); /** * 要實(shí)現(xiàn)延時(shí)隊(duì)列的名字 */ public abstract String setDelayQueueName(); //Spring Boot初始化時(shí)開啟一條線程運(yùn)行 @PostConstruct public void init() { new Thread(this::startDelayQueueMachine).start(); } }
addJob方法是添加任務(wù)id和延時(shí)時(shí)間(單位毫秒)
redisUtil.zRangeByScore ::根據(jù)設(shè)置的score獲取區(qū)間值
@PostConstruct注解:是針對(duì)Bean的初始化完成之后做一些事情,比如注冊一些監(jiān)聽器..(初始化實(shí)現(xiàn)方案有很多可自行選擇)
為什么先刪除后執(zhí)行業(yè)務(wù)邏輯?
刪除成功才執(zhí)行延時(shí)任務(wù),否則不執(zhí)行,這樣可以避免分布式系統(tǒng)延時(shí)任務(wù)多次執(zhí)行
5.4 RedisUtil工具類
@Component @Slf4j public class RedisUtil { @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 向Zset里添加成員 * * @param key key值 * @param score 分?jǐn)?shù),通常用于排序 * @param value 值 * @return 增加狀態(tài) */ public boolean zAdd(String key, long score, String value) { Boolean result = redisTemplate.opsForZSet().add(key, value, score); return result; } /** * 獲取 某key 下 某一分值區(qū)間的隊(duì)列 * * @param key 緩存key * @param from 開始時(shí)間 * @param to 結(jié)束時(shí)間 * @return 數(shù)據(jù) */ public Set<ZSetOperations.TypedTuple<Object>> zRangeByScore(String key, int from, long to) { Set<ZSetOperations.TypedTuple<Object>> set = redisTemplate.opsForZSet().rangeByScoreWithScores(key, from, to); return set; } /** * 移除 Zset隊(duì)列值 * * @param key key值 * @param value 刪除的集合 * @return 刪除數(shù)量 */ public Long zRemove(String key, String... value) { return redisTemplate.opsForZSet().remove(key, value); } }
5.5 測試延時(shí)隊(duì)列
繼承上文中的延時(shí)隊(duì)列工廠重寫invoke(處理業(yè)務(wù))
和setDelayQueueName--延時(shí)隊(duì)列名稱也就是Zset中的key值
/** * 測試延時(shí)隊(duì)列 * */ @Slf4j @Component public class DelayQueue extends AbstractDelayQueueMachineFactory { @Autowired private ZnjExpertConsultQuestionRecordMapper questionRecordMapper; /** * 處理業(yè)務(wù)邏輯 */ @Override public void invoke(String jobId) { Integer questionId = Convert.toInt(jobId); ZnjExpertConsultQuestionRecordEntity questionRecordEntity = questionRecordMapper.selectById(questionId); Boolean flag = znjExpertConsultService.whetherEnd(questionRecordEntity); /** * 延時(shí)隊(duì)列名統(tǒng)一設(shè)定 */ @Override public String setDelayQueueName() { return "expert_consult:delay_queue"; } }
運(yùn)行成功,當(dāng)Redis中有任務(wù)時(shí),則執(zhí)行任務(wù)即可
四、總結(jié)
使用redis zset來實(shí)現(xiàn)延時(shí)任務(wù),總體類說是可行的
- 實(shí)時(shí)性: 允許存在一定時(shí)間內(nèi)的誤差(可以通過時(shí)間設(shè)定)
- 高可用性:支持單機(jī),支持集群
- 消息可靠性: 保證至少被消費(fèi)一次
- 消息持久化: 基于Redis自身的持久化特性,上面的消息可靠性基于Redis的持久化,所以如果redis數(shù)據(jù)丟失,意味著延遲消息的丟失,不過可以做主備和集群保證
以上就是Redis 延時(shí)任務(wù)實(shí)現(xiàn)及與定時(shí)任務(wù)區(qū)別詳解的詳細(xì)內(nèi)容,更多關(guān)于Redis延時(shí)任務(wù)定時(shí)任務(wù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
window下創(chuàng)建redis出現(xiàn)問題小結(jié)
這篇文章主要介紹了window下創(chuàng)建redis出現(xiàn)問題總結(jié),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10redis反序列化報(bào)錯(cuò)原因分析以及解決方案
這篇文章主要介紹了redis反序列化報(bào)錯(cuò)原因分析以及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03Redis系列之底層數(shù)據(jù)結(jié)構(gòu)SDS詳解
SDS(簡單動(dòng)態(tài)字符串)是Redis使用的核心數(shù)據(jù)結(jié)構(gòu),用于替代C語言的字符串,以解決長度獲取慢、內(nèi)存溢出等問題,SDS通過預(yù)分配與惰性釋放策略優(yōu)化內(nèi)存使用,增強(qiáng)安全性,且能存儲(chǔ)文本與二進(jìn)制數(shù)據(jù),可查看源碼src/sds.h和src/sds.c了解更多2024-11-11基于Redis的List實(shí)現(xiàn)特價(jià)商品列表功能
本文通過場景分析給大家介紹了基于Redis的List實(shí)現(xiàn)特價(jià)商品列表,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2021-08-08redis3.2配置文件redis.conf詳細(xì)說明
redis3.2配置詳解,Redis啟動(dòng)的時(shí)候,可以指定配置文件,詳細(xì)說明請看本文說明2018-03-03