Redis模擬延時(shí)隊(duì)列實(shí)現(xiàn)日程提醒的方法
使用Redis模擬延時(shí)隊(duì)列
實(shí)際上通過MQ實(shí)現(xiàn)延時(shí)隊(duì)列更加方便,只是在實(shí)際業(yè)務(wù)中種種原因?qū)е伦罱K選擇使用redis作為該業(yè)務(wù)實(shí)現(xiàn)的中間件,順便記錄一下。
該業(yè)務(wù)是用于日程短信提醒,用戶添加日程后,就會被放入redis隊(duì)列中等待被執(zhí)行發(fā)送短信提醒業(yè)務(wù)。
本文介紹如何使用Redis來實(shí)現(xiàn)一個簡單的延時(shí)任務(wù)隊(duì)列,通過這個示例,可以幫助你理解如何利用Redis的有序集合特性來管理和執(zhí)行延時(shí)任務(wù)。
設(shè)計(jì)思路
Redis有序集合(Sorted Set)可以很好地用來實(shí)現(xiàn)延時(shí)隊(duì)列的功能。通過將任務(wù)的執(zhí)行時(shí)間作為分?jǐn)?shù)(score)來存入有序集合中,并定期檢查集合中小于等于當(dāng)前時(shí)間的任務(wù)來觸發(fā)執(zhí)行。
代碼實(shí)現(xiàn)
JedisCluster連接初始化
首先,我們需要初始化JedisCluster連接來與Redis集群進(jìn)行交互。
private static final String ZSET_KEY = "sms_delayed_tasks"; private JedisCluster jedisCluster; public void RedisClusterScheduler() { Set<HostAndPort> nodes = new HashSet<>(); //從配置文件中讀取redis集群配置 for (String node : AcpCore.getProp("spring.redis.cluster.nodes").split(",")) { String[] hostPort = node.split(":"); nodes.add(new HostAndPort(hostPort[0], Integer.parseInt(hostPort[1]))); } GenericObjectPoolConfig<Jedis> poolConfig = new GenericObjectPoolConfig<>(); poolConfig.setMaxTotal(128); poolConfig.setMaxIdle(128); poolConfig.setMinIdle(16); jedisCluster = new JedisCluster(nodes, 2000, 2000, 5, AcpCore.getProp("spring.redis.password"), poolConfig); if (!isCalled) { isCalled = true; startTaskChecker(); } }
添加延時(shí)任務(wù)
我們可以通過指定任務(wù)和執(zhí)行時(shí)間來添加延時(shí)任務(wù)。該方法將執(zhí)行時(shí)間轉(zhuǎn)換為時(shí)間戳,并將任務(wù)存儲在Redis有序集合中。
public void addDelayedTask(String task, String time) { long executeTime = convertToTimestamp(time); if (executeTime > System.currentTimeMillis() / 1000) { jedisCluster.zadd(ZSET_KEY, executeTime, task); log.info("添加任務(wù): " + task + " 執(zhí)行時(shí)間: " + executeTime); } else { log.info("任務(wù)時(shí)間必須在當(dāng)前時(shí)間之后: " + task); } } private long convertToTimestamp(String time) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); try { return sdf.parse(time).getTime() / 1000; } catch (ParseException e) { e.printStackTrace(); return System.currentTimeMillis() / 1000; } }
檢查和執(zhí)行任務(wù)
通過一個定時(shí)任務(wù)不斷檢查當(dāng)前時(shí)間之前的任務(wù)并執(zhí)行。
private void startTaskChecker() { executorService.submit(() -> { while (!Thread.currentThread().isInterrupted()) { try { checkAndExecuteTasks(); Thread.sleep(1000); } catch (Exception e) { log.info(new Date() + "發(fā)生異常但不中斷,異常是:" + e); } } }); } private void checkAndExecuteTasks() { long currentTime = System.currentTimeMillis() / 1000; Set<String> tasks = jedisCluster.zrangeByScore(ZSET_KEY, 0, currentTime); for (String task : tasks) { jedisCluster.zrem(ZSET_KEY, task); executeTask(task); } }
執(zhí)行任務(wù)的邏輯
假設(shè)任務(wù)內(nèi)容是一個JSON對象,執(zhí)行邏輯在這里可以是任何操作,比如調(diào)用外部服務(wù)、發(fā)送消息等。
private void executeTask(String taskJson) { JSONObject task = JSONObject.parseObject(taskJson); // 在此處添加具體的業(yè)務(wù)邏輯 log.info("執(zhí)行任務(wù): " + task); }
總結(jié)
通過Redis的有序集合和簡單的定時(shí)器,能夠?qū)崿F(xiàn)一個簡潔有效的延時(shí)任務(wù)隊(duì)列。
當(dāng)然,這個示例是一個簡化的模型,在生產(chǎn)環(huán)境中,你需要考慮任務(wù)的冪等性、系統(tǒng)崩潰后的恢復(fù)策略、任務(wù)的優(yōu)先級等問題。希望本文能為你提供實(shí)現(xiàn)延時(shí)隊(duì)列的思路和參考。
到此這篇關(guān)于Redis模擬延時(shí)隊(duì)列 實(shí)現(xiàn)日程提醒的文章就介紹到這了,更多相關(guān)Redis延時(shí)隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
基于Redis?zSet實(shí)現(xiàn)滑動窗口對短信進(jìn)行防刷限流的問題
這篇文章主要介紹了基于Redis?zSet實(shí)現(xiàn)滑動窗口對短信進(jìn)行防刷限流,主要針對目前線上短信被腳本惡意盜刷的情況,用Redis實(shí)現(xiàn)滑動窗口限流,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友參考下吧2022-02-02redis通過lua腳本,獲取滿足key pattern的所有值方式
這篇文章主要介紹了redis通過lua腳本,獲取滿足key pattern的所有值方式,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-03-03