Redis中4種延時(shí)隊(duì)列實(shí)現(xiàn)方式小結(jié)
延時(shí)隊(duì)列是一種特殊的消息隊(duì)列,它允許消息在指定的時(shí)間后被消費(fèi)。在微服務(wù)架構(gòu)、電商系統(tǒng)和任務(wù)調(diào)度場景中,延時(shí)隊(duì)列扮演著關(guān)鍵角色。例如,訂單超時(shí)自動(dòng)取消、定時(shí)提醒、延時(shí)支付等都依賴延時(shí)隊(duì)列實(shí)現(xiàn)。
Redis作為高性能的內(nèi)存數(shù)據(jù)庫,具備原子操作、數(shù)據(jù)結(jié)構(gòu)豐富和簡單易用的特性,本文將介紹基于Redis實(shí)現(xiàn)分布式延時(shí)隊(duì)列的四種方式。
1. 基于Sorted Set的延時(shí)隊(duì)列
原理
利用Redis的Sorted Set(有序集合),將消息ID作為member,執(zhí)行時(shí)間戳作為score進(jìn)行存儲(chǔ)。通過ZRANGEBYSCORE
命令可以獲取到達(dá)執(zhí)行時(shí)間的任務(wù)。
代碼實(shí)現(xiàn)
public class RedisZSetDelayQueue { private final StringRedisTemplate redisTemplate; private final String queueKey = "delay_queue:tasks"; public RedisZSetDelayQueue(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } /** * 添加延時(shí)任務(wù) * @param taskId 任務(wù)ID * @param taskInfo 任務(wù)信息(JSON字符串) * @param delayTime 延遲時(shí)間(秒) */ public void addTask(String taskId, String taskInfo, long delayTime) { // 計(jì)算執(zhí)行時(shí)間 long executeTime = System.currentTimeMillis() + delayTime * 1000; // 存儲(chǔ)任務(wù)詳情 redisTemplate.opsForHash().put("delay_queue:details", taskId, taskInfo); // 添加到延時(shí)隊(duì)列 redisTemplate.opsForZSet().add(queueKey, taskId, executeTime); System.out.println("Task added: " + taskId + ", will execute at: " + executeTime); } /** * 輪詢獲取到期任務(wù) */ public List<String> pollTasks() { long now = System.currentTimeMillis(); // 獲取當(dāng)前時(shí)間之前的任務(wù) Set<String> taskIds = redisTemplate.opsForZSet() .rangeByScore(queueKey, 0, now); if (taskIds == null || taskIds.isEmpty()) { return Collections.emptyList(); } // 獲取任務(wù)詳情 List<String> tasks = new ArrayList<>(); for (String taskId : taskIds) { String taskInfo = (String) redisTemplate.opsForHash() .get("delay_queue:details", taskId); if (taskInfo != null) { tasks.add(taskInfo); // 從集合和詳情中移除任務(wù) redisTemplate.opsForZSet().remove(queueKey, taskId); redisTemplate.opsForHash().delete("delay_queue:details", taskId); } } return tasks; } // 定時(shí)任務(wù)示例 public void startTaskProcessor() { ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(() -> { try { List<String> tasks = pollTasks(); for (String task : tasks) { processTask(task); } } catch (Exception e) { e.printStackTrace(); } }, 0, 1, TimeUnit.SECONDS); } private void processTask(String taskInfo) { System.out.println("Processing task: " + taskInfo); // 實(shí)際任務(wù)處理邏輯 } }
優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
- 實(shí)現(xiàn)簡單,易于理解
- 任務(wù)按執(zhí)行時(shí)間自動(dòng)排序
- 支持精確的時(shí)間控制
缺點(diǎn)
- 需要輪詢獲取到期任務(wù),消耗CPU資源
- 大量任務(wù)情況下,
ZRANGEBYSCORE
操作可能影響性能 - 沒有消費(fèi)確認(rèn)機(jī)制,需要額外實(shí)現(xiàn)
2. 基于List + 定時(shí)輪詢的延時(shí)隊(duì)列
原理
這種方式使用多個(gè)List作為存儲(chǔ)容器,按延遲時(shí)間的不同將任務(wù)分配到不同的隊(duì)列中。通過定時(shí)輪詢各個(gè)隊(duì)列,將到期任務(wù)移動(dòng)到一個(gè)立即執(zhí)行隊(duì)列。
代碼實(shí)現(xiàn)
public class RedisListDelayQueue { private final StringRedisTemplate redisTemplate; private final String readyQueueKey = "delay_queue:ready"; // 待處理隊(duì)列 private final Map<Integer, String> delayQueueKeys; // 延遲隊(duì)列,按延時(shí)時(shí)間分級(jí) public RedisListDelayQueue(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; // 初始化不同延遲級(jí)別的隊(duì)列 delayQueueKeys = new HashMap<>(); delayQueueKeys.put(5, "delay_queue:delay_5s"); // 5秒 delayQueueKeys.put(60, "delay_queue:delay_1m"); // 1分鐘 delayQueueKeys.put(300, "delay_queue:delay_5m"); // 5分鐘 delayQueueKeys.put(1800, "delay_queue:delay_30m"); // 30分鐘 } /** * 添加延時(shí)任務(wù) */ public void addTask(String taskInfo, int delaySeconds) { // 選擇合適的延遲隊(duì)列 String queueKey = selectDelayQueue(delaySeconds); // 任務(wù)元數(shù)據(jù),包含任務(wù)信息和執(zhí)行時(shí)間 long executeTime = System.currentTimeMillis() + delaySeconds * 1000; String taskData = executeTime + ":" + taskInfo; // 添加到延遲隊(duì)列 redisTemplate.opsForList().rightPush(queueKey, taskData); System.out.println("Task added to " + queueKey + ": " + taskData); } /** * 選擇合適的延遲隊(duì)列 */ private String selectDelayQueue(int delaySeconds) { // 找到最接近的延遲級(jí)別 int closestDelay = delayQueueKeys.keySet().stream() .filter(delay -> delay >= delaySeconds) .min(Integer::compareTo) .orElse(Collections.max(delayQueueKeys.keySet())); return delayQueueKeys.get(closestDelay); } /** * 移動(dòng)到期任務(wù)到待處理隊(duì)列 */ public void moveTasksToReadyQueue() { long now = System.currentTimeMillis(); // 遍歷所有延遲隊(duì)列 for (String queueKey : delayQueueKeys.values()) { boolean hasMoreTasks = true; while (hasMoreTasks) { // 查看隊(duì)列頭部任務(wù) String taskData = redisTemplate.opsForList().index(queueKey, 0); if (taskData == null) { hasMoreTasks = false; continue; } // 解析任務(wù)執(zhí)行時(shí)間 long executeTime = Long.parseLong(taskData.split(":", 2)[0]); // 檢查是否到期 if (executeTime <= now) { // 通過LPOP原子性地移除隊(duì)列頭部任務(wù) String task = redisTemplate.opsForList().leftPop(queueKey); // 任務(wù)可能被其他進(jìn)程處理,再次檢查 if (task != null) { // 提取任務(wù)信息并添加到待處理隊(duì)列 String taskInfo = task.split(":", 2)[1]; redisTemplate.opsForList().rightPush(readyQueueKey, taskInfo); System.out.println("Task moved to ready queue: " + taskInfo); } } else { // 隊(duì)列頭部任務(wù)未到期,無需檢查后面的任務(wù) hasMoreTasks = false; } } } } /** * 獲取待處理任務(wù) */ public String getReadyTask() { return redisTemplate.opsForList().leftPop(readyQueueKey); } /** * 啟動(dòng)任務(wù)處理器 */ public void startTaskProcessors() { // 定時(shí)移動(dòng)到期任務(wù) ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); // 移動(dòng)任務(wù)線程 scheduler.scheduleAtFixedRate(this::moveTasksToReadyQueue, 0, 1, TimeUnit.SECONDS); // 處理任務(wù)線程 scheduler.scheduleAtFixedRate(() -> { String task = getReadyTask(); if (task != null) { processTask(task); } }, 0, 100, TimeUnit.MILLISECONDS); } private void processTask(String taskInfo) { System.out.println("Processing task: " + taskInfo); // 實(shí)際任務(wù)處理邏輯 } }
優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
- 分級(jí)隊(duì)列設(shè)計(jì),降低單隊(duì)列壓力
- 相比Sorted Set占用內(nèi)存少
- 支持隊(duì)列監(jiān)控和任務(wù)優(yōu)先級(jí)
缺點(diǎn)
- 延遲時(shí)間精度受輪詢頻率影響
- 實(shí)現(xiàn)復(fù)雜度高
- 需要維護(hù)多個(gè)隊(duì)列
- 時(shí)間判斷和隊(duì)列操作非原子性,需特別處理并發(fā)問題
3. 基于發(fā)布/訂閱(Pub/Sub)的延時(shí)隊(duì)列
原理
結(jié)合Redis發(fā)布/訂閱功能與本地時(shí)間輪算法,實(shí)現(xiàn)延遲任務(wù)的分發(fā)和處理。任務(wù)信息存儲(chǔ)在Redis中,而時(shí)間輪負(fù)責(zé)任務(wù)的調(diào)度和發(fā)布。
代碼實(shí)現(xiàn)
public class RedisPubSubDelayQueue { private final StringRedisTemplate redisTemplate; private final String TASK_TOPIC = "delay_queue:task_channel"; private final String TASK_HASH = "delay_queue:tasks"; private final HashedWheelTimer timer; public RedisPubSubDelayQueue(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; // 初始化時(shí)間輪,刻度100ms,輪子大小512 this.timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512); // 啟動(dòng)消息訂閱 subscribeTaskChannel(); } /** * 添加延時(shí)任務(wù) */ public void addTask(String taskId, String taskInfo, long delaySeconds) { // 存儲(chǔ)任務(wù)信息到Redis redisTemplate.opsForHash().put(TASK_HASH, taskId, taskInfo); // 添加到時(shí)間輪 timer.newTimeout(timeout -> { // 發(fā)布任務(wù)就緒消息 redisTemplate.convertAndSend(TASK_TOPIC, taskId); }, delaySeconds, TimeUnit.SECONDS); System.out.println("Task scheduled: " + taskId + ", delay: " + delaySeconds + "s"); } /** * 訂閱任務(wù)通道 */ private void subscribeTaskChannel() { redisTemplate.getConnectionFactory().getConnection().subscribe( (message, pattern) -> { String taskId = new String(message.getBody()); // 獲取任務(wù)信息 String taskInfo = (String) redisTemplate.opsForHash().get(TASK_HASH, taskId); if (taskInfo != null) { // 處理任務(wù) processTask(taskId, taskInfo); // 刪除任務(wù) redisTemplate.opsForHash().delete(TASK_HASH, taskId); } }, TASK_TOPIC.getBytes() ); } private void processTask(String taskId, String taskInfo) { System.out.println("Processing task: " + taskId + " - " + taskInfo); // 實(shí)際任務(wù)處理邏輯 } // 模擬HashedWheelTimer類 public static class HashedWheelTimer { private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private final long tickDuration; private final TimeUnit unit; public HashedWheelTimer(long tickDuration, TimeUnit unit, int wheelSize) { this.tickDuration = tickDuration; this.unit = unit; } public void newTimeout(TimerTask task, long delay, TimeUnit timeUnit) { long delayMillis = timeUnit.toMillis(delay); scheduler.schedule( () -> task.run(null), delayMillis, TimeUnit.MILLISECONDS ); } public interface TimerTask { void run(Timeout timeout); } public interface Timeout { } } }
優(yōu)缺點(diǎn)
優(yōu)點(diǎn):
- 即時(shí)觸發(fā),無需輪詢
- 高效的時(shí)間輪算法
- 可以跨應(yīng)用訂閱任務(wù)
- 分離任務(wù)調(diào)度和執(zhí)行,降低耦合
缺點(diǎn):
- 依賴本地時(shí)間輪,非純Redis實(shí)現(xiàn)
- Pub/Sub模式無消息持久化,可能丟失消息
- 服務(wù)重啟時(shí)需要重建時(shí)間輪
- 訂閱者需要保持連接
4. 基于Redis Stream的延時(shí)隊(duì)列
原理
Redis 5.0引入的Stream是一個(gè)強(qiáng)大的數(shù)據(jù)結(jié)構(gòu),專為消息隊(duì)列設(shè)計(jì)。結(jié)合Stream的消費(fèi)組和確認(rèn)機(jī)制,可以構(gòu)建可靠的延時(shí)隊(duì)列。
代碼實(shí)現(xiàn)
public class RedisStreamDelayQueue { private final StringRedisTemplate redisTemplate; private final String delayQueueKey = "delay_queue:stream"; private final String consumerGroup = "delay_queue_consumers"; private final String consumerId = UUID.randomUUID().toString(); public RedisStreamDelayQueue(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; // 創(chuàng)建消費(fèi)者組 try { redisTemplate.execute((RedisCallback<String>) connection -> { connection.streamCommands().xGroupCreate( delayQueueKey.getBytes(), consumerGroup, ReadOffset.from("0"), true ); return "OK"; }); } catch (Exception e) { // 消費(fèi)者組可能已存在 System.out.println("Consumer group may already exist: " + e.getMessage()); } } /** * 添加延時(shí)任務(wù) */ public void addTask(String taskInfo, long delaySeconds) { long executeTime = System.currentTimeMillis() + delaySeconds * 1000; Map<String, Object> task = new HashMap<>(); task.put("executeTime", String.valueOf(executeTime)); task.put("taskInfo", taskInfo); redisTemplate.opsForStream().add(delayQueueKey, task); System.out.println("Task added: " + taskInfo + ", execute at: " + executeTime); } /** * 獲取待執(zhí)行的任務(wù) */ public List<String> pollTasks() { long now = System.currentTimeMillis(); List<String> readyTasks = new ArrayList<>(); // 讀取尚未處理的消息 List<MapRecord<String, Object, Object>> records = redisTemplate.execute( (RedisCallback<List<MapRecord<String, Object, Object>>>) connection -> { return connection.streamCommands().xReadGroup( consumerGroup.getBytes(), consumerId.getBytes(), StreamReadOptions.empty().count(10), StreamOffset.create(delayQueueKey.getBytes(), ReadOffset.from(">")) ); } ); if (records != null) { for (MapRecord<String, Object, Object> record : records) { String messageId = record.getId().getValue(); Map<Object, Object> value = record.getValue(); long executeTime = Long.parseLong((String) value.get("executeTime")); String taskInfo = (String) value.get("taskInfo"); // 檢查任務(wù)是否到期 if (executeTime <= now) { readyTasks.add(taskInfo); // 確認(rèn)消息已處理 redisTemplate.execute((RedisCallback<String>) connection -> { connection.streamCommands().xAck( delayQueueKey.getBytes(), consumerGroup.getBytes(), messageId.getBytes() ); return "OK"; }); // 可選:從流中刪除消息 redisTemplate.opsForStream().delete(delayQueueKey, messageId); } else { // 任務(wù)未到期,放回隊(duì)列 redisTemplate.execute((RedisCallback<String>) connection -> { connection.streamCommands().xAck( delayQueueKey.getBytes(), consumerGroup.getBytes(), messageId.getBytes() ); return "OK"; }); // 重新添加任務(wù)(可選:使用延遲重新入隊(duì)策略) Map<String, Object> newTask = new HashMap<>(); newTask.put("executeTime", String.valueOf(executeTime)); newTask.put("taskInfo", taskInfo); redisTemplate.opsForStream().add(delayQueueKey, newTask); } } } return readyTasks; } /** * 啟動(dòng)任務(wù)處理器 */ public void startTaskProcessor() { ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(() -> { try { List<String> tasks = pollTasks(); for (String task : tasks) { processTask(task); } } catch (Exception e) { e.printStackTrace(); } }, 0, 1, TimeUnit.SECONDS); } private void processTask(String taskInfo) { System.out.println("Processing task: " + taskInfo); // 實(shí)際任務(wù)處理邏輯 } }
優(yōu)缺點(diǎn)
優(yōu)點(diǎn):
- 支持消費(fèi)者組和消息確認(rèn),提供可靠的消息處理
- 內(nèi)置消息持久化機(jī)制
- 支持多消費(fèi)者并行處理
- 消息ID包含時(shí)間戳,便于排序
缺點(diǎn):
- 要求Redis 5.0+版本
- 實(shí)現(xiàn)相對(duì)復(fù)雜
- 仍需輪詢獲取到期任務(wù)
- 對(duì)未到期任務(wù)的處理相對(duì)繁瑣
性能對(duì)比與選型建議
實(shí)現(xiàn)方式 | 性能 | 可靠性 | 實(shí)現(xiàn)復(fù)雜度 | 內(nèi)存占用 | 適用場景 |
---|---|---|---|---|---|
Sorted Set | ★★★★☆ | ★★★☆☆ | 低 | 中 | 任務(wù)量適中,需要精確調(diào)度 |
List + 輪詢 | ★★★★★ | ★★★☆☆ | 中 | 低 | 高并發(fā),延時(shí)精度要求不高 |
Pub/Sub + 時(shí)間輪 | ★★★★★ | ★★☆☆☆ | 高 | 低 | 實(shí)時(shí)性要求高,可容忍服務(wù)重啟丟失 |
Stream | ★★★☆☆ | ★★★★★ | 高 | 中 | 可靠性要求高,需要消息確認(rèn) |
總結(jié)
在實(shí)際應(yīng)用中,可根據(jù)系統(tǒng)規(guī)模、性能需求、可靠性要求和實(shí)現(xiàn)復(fù)雜度等因素進(jìn)行選擇,也可以組合多種方式打造更符合業(yè)務(wù)需求的延時(shí)隊(duì)列解決方案。無論選擇哪種實(shí)現(xiàn),都應(yīng)關(guān)注可靠性、性能和監(jiān)控等方面,確保延時(shí)隊(duì)列在生產(chǎn)環(huán)境中穩(wěn)定運(yùn)行。
到此這篇關(guān)于Redis中4種延時(shí)隊(duì)列實(shí)現(xiàn)方式小結(jié)的文章就介紹到這了,更多相關(guān)Redis延時(shí)隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- redis延時(shí)隊(duì)列zset實(shí)現(xiàn)的示例
- redis和rabbitmq實(shí)現(xiàn)延時(shí)隊(duì)列的示例代碼
- Redis簡易延時(shí)隊(duì)列的實(shí)現(xiàn)示例
- Redis消息隊(duì)列、阻塞隊(duì)列、延時(shí)隊(duì)列的實(shí)現(xiàn)
- redis使用zset實(shí)現(xiàn)延時(shí)隊(duì)列的示例代碼
- 生產(chǎn)redisson延時(shí)隊(duì)列不消費(fèi)問題排查解決
- redis實(shí)現(xiàn)延時(shí)隊(duì)列的兩種方式(小結(jié))
相關(guān)文章
Spring Boot如何使用Undertow代替Tomcat
這篇文章主要介紹了Spring Boot如何使用Undertow代替Tomcat,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09Java 獲取當(dāng)前類名和方法名的實(shí)現(xiàn)方法
這篇文章主要介紹了 Java 獲取當(dāng)前類名和方法名的實(shí)現(xiàn)方法的相關(guān)資料,這里不僅提供了實(shí)現(xiàn)方法并比較幾種方法的效率,需要的朋友可以參考下2017-07-07java讀取文件顯示進(jìn)度條的實(shí)現(xiàn)方法
當(dāng)讀取一個(gè)大文件時(shí),一時(shí)半會(huì)兒無法看到讀取結(jié)果,就需要顯示一個(gè)進(jìn)度條,是程序員明白已經(jīng)讀了多少文件,可以估算讀取還需要多少時(shí)間,下面的代碼可以實(shí)現(xiàn)這個(gè)功能2014-01-01SQL Server 2000 Driver for JDBC Service Pack 3 安裝測試方法
這篇文章主要介紹了數(shù)據(jù)庫連接測試程序(SQL Server 2000 Driver for JDBC Service Pack 3 安裝測試),需要的朋友可以參考下2014-10-10Java synchronized重量級(jí)鎖實(shí)現(xiàn)過程淺析
這篇文章主要介紹了Java synchronized重量級(jí)鎖實(shí)現(xiàn)過程,synchronized是Java里的一個(gè)關(guān)鍵字,起到的一個(gè)效果是"監(jiān)視器鎖",它的功能就是保證操作的原子性,同時(shí)禁止指令重排序和保證內(nèi)存的可見性2023-02-02java狀態(tài)機(jī)方案解決訂單狀態(tài)扭轉(zhuǎn)示例詳解
這篇文章主要為大家介紹了java狀態(tài)機(jī)方案解決訂單狀態(tài)扭轉(zhuǎn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-03-03Java中鎖的實(shí)現(xiàn)和內(nèi)存語義淺析
這篇文章主要給大家介紹了關(guān)于Java中鎖的實(shí)現(xiàn)和內(nèi)存語義的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-11-11Hibernate基于ThreadLocal管理Session過程解析
這篇文章主要介紹了Hibernate基于ThreadLocal管理Session過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10