Redis中4種延時隊列實現(xiàn)方式小結(jié)
延時隊列是一種特殊的消息隊列,它允許消息在指定的時間后被消費。在微服務(wù)架構(gòu)、電商系統(tǒng)和任務(wù)調(diào)度場景中,延時隊列扮演著關(guān)鍵角色。例如,訂單超時自動取消、定時提醒、延時支付等都依賴延時隊列實現(xiàn)。
Redis作為高性能的內(nèi)存數(shù)據(jù)庫,具備原子操作、數(shù)據(jù)結(jié)構(gòu)豐富和簡單易用的特性,本文將介紹基于Redis實現(xiàn)分布式延時隊列的四種方式。
1. 基于Sorted Set的延時隊列
原理
利用Redis的Sorted Set(有序集合),將消息ID作為member,執(zhí)行時間戳作為score進(jìn)行存儲。通過ZRANGEBYSCORE命令可以獲取到達(dá)執(zhí)行時間的任務(wù)。
代碼實現(xiàn)
public class RedisZSetDelayQueue {
private final StringRedisTemplate redisTemplate;
private final String queueKey = "delay_queue:tasks";
public RedisZSetDelayQueue(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* 添加延時任務(wù)
* @param taskId 任務(wù)ID
* @param taskInfo 任務(wù)信息(JSON字符串)
* @param delayTime 延遲時間(秒)
*/
public void addTask(String taskId, String taskInfo, long delayTime) {
// 計算執(zhí)行時間
long executeTime = System.currentTimeMillis() + delayTime * 1000;
// 存儲任務(wù)詳情
redisTemplate.opsForHash().put("delay_queue:details", taskId, taskInfo);
// 添加到延時隊列
redisTemplate.opsForZSet().add(queueKey, taskId, executeTime);
System.out.println("Task added: " + taskId + ", will execute at: " + executeTime);
}
/**
* 輪詢獲取到期任務(wù)
*/
public List<String> pollTasks() {
long now = System.currentTimeMillis();
// 獲取當(dāng)前時間之前的任務(wù)
Set<String> taskIds = redisTemplate.opsForZSet()
.rangeByScore(queueKey, 0, now);
if (taskIds == null || taskIds.isEmpty()) {
return Collections.emptyList();
}
// 獲取任務(wù)詳情
List<String> tasks = new ArrayList<>();
for (String taskId : taskIds) {
String taskInfo = (String) redisTemplate.opsForHash()
.get("delay_queue:details", taskId);
if (taskInfo != null) {
tasks.add(taskInfo);
// 從集合和詳情中移除任務(wù)
redisTemplate.opsForZSet().remove(queueKey, taskId);
redisTemplate.opsForHash().delete("delay_queue:details", taskId);
}
}
return tasks;
}
// 定時任務(wù)示例
public void startTaskProcessor() {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
try {
List<String> tasks = pollTasks();
for (String task : tasks) {
processTask(task);
}
} catch (Exception e) {
e.printStackTrace();
}
}, 0, 1, TimeUnit.SECONDS);
}
private void processTask(String taskInfo) {
System.out.println("Processing task: " + taskInfo);
// 實際任務(wù)處理邏輯
}
}
優(yōu)缺點
優(yōu)點
- 實現(xiàn)簡單,易于理解
- 任務(wù)按執(zhí)行時間自動排序
- 支持精確的時間控制
缺點
- 需要輪詢獲取到期任務(wù),消耗CPU資源
- 大量任務(wù)情況下,
ZRANGEBYSCORE操作可能影響性能 - 沒有消費確認(rèn)機(jī)制,需要額外實現(xiàn)
2. 基于List + 定時輪詢的延時隊列
原理
這種方式使用多個List作為存儲容器,按延遲時間的不同將任務(wù)分配到不同的隊列中。通過定時輪詢各個隊列,將到期任務(wù)移動到一個立即執(zhí)行隊列。
代碼實現(xiàn)
public class RedisListDelayQueue {
private final StringRedisTemplate redisTemplate;
private final String readyQueueKey = "delay_queue:ready"; // 待處理隊列
private final Map<Integer, String> delayQueueKeys; // 延遲隊列,按延時時間分級
public RedisListDelayQueue(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
// 初始化不同延遲級別的隊列
delayQueueKeys = new HashMap<>();
delayQueueKeys.put(5, "delay_queue:delay_5s"); // 5秒
delayQueueKeys.put(60, "delay_queue:delay_1m"); // 1分鐘
delayQueueKeys.put(300, "delay_queue:delay_5m"); // 5分鐘
delayQueueKeys.put(1800, "delay_queue:delay_30m"); // 30分鐘
}
/**
* 添加延時任務(wù)
*/
public void addTask(String taskInfo, int delaySeconds) {
// 選擇合適的延遲隊列
String queueKey = selectDelayQueue(delaySeconds);
// 任務(wù)元數(shù)據(jù),包含任務(wù)信息和執(zhí)行時間
long executeTime = System.currentTimeMillis() + delaySeconds * 1000;
String taskData = executeTime + ":" + taskInfo;
// 添加到延遲隊列
redisTemplate.opsForList().rightPush(queueKey, taskData);
System.out.println("Task added to " + queueKey + ": " + taskData);
}
/**
* 選擇合適的延遲隊列
*/
private String selectDelayQueue(int delaySeconds) {
// 找到最接近的延遲級別
int closestDelay = delayQueueKeys.keySet().stream()
.filter(delay -> delay >= delaySeconds)
.min(Integer::compareTo)
.orElse(Collections.max(delayQueueKeys.keySet()));
return delayQueueKeys.get(closestDelay);
}
/**
* 移動到期任務(wù)到待處理隊列
*/
public void moveTasksToReadyQueue() {
long now = System.currentTimeMillis();
// 遍歷所有延遲隊列
for (String queueKey : delayQueueKeys.values()) {
boolean hasMoreTasks = true;
while (hasMoreTasks) {
// 查看隊列頭部任務(wù)
String taskData = redisTemplate.opsForList().index(queueKey, 0);
if (taskData == null) {
hasMoreTasks = false;
continue;
}
// 解析任務(wù)執(zhí)行時間
long executeTime = Long.parseLong(taskData.split(":", 2)[0]);
// 檢查是否到期
if (executeTime <= now) {
// 通過LPOP原子性地移除隊列頭部任務(wù)
String task = redisTemplate.opsForList().leftPop(queueKey);
// 任務(wù)可能被其他進(jìn)程處理,再次檢查
if (task != null) {
// 提取任務(wù)信息并添加到待處理隊列
String taskInfo = task.split(":", 2)[1];
redisTemplate.opsForList().rightPush(readyQueueKey, taskInfo);
System.out.println("Task moved to ready queue: " + taskInfo);
}
} else {
// 隊列頭部任務(wù)未到期,無需檢查后面的任務(wù)
hasMoreTasks = false;
}
}
}
}
/**
* 獲取待處理任務(wù)
*/
public String getReadyTask() {
return redisTemplate.opsForList().leftPop(readyQueueKey);
}
/**
* 啟動任務(wù)處理器
*/
public void startTaskProcessors() {
// 定時移動到期任務(wù)
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// 移動任務(wù)線程
scheduler.scheduleAtFixedRate(this::moveTasksToReadyQueue, 0, 1, TimeUnit.SECONDS);
// 處理任務(wù)線程
scheduler.scheduleAtFixedRate(() -> {
String task = getReadyTask();
if (task != null) {
processTask(task);
}
}, 0, 100, TimeUnit.MILLISECONDS);
}
private void processTask(String taskInfo) {
System.out.println("Processing task: " + taskInfo);
// 實際任務(wù)處理邏輯
}
}
優(yōu)缺點
優(yōu)點
- 分級隊列設(shè)計,降低單隊列壓力
- 相比Sorted Set占用內(nèi)存少
- 支持隊列監(jiān)控和任務(wù)優(yōu)先級
缺點
- 延遲時間精度受輪詢頻率影響
- 實現(xiàn)復(fù)雜度高
- 需要維護(hù)多個隊列
- 時間判斷和隊列操作非原子性,需特別處理并發(fā)問題
3. 基于發(fā)布/訂閱(Pub/Sub)的延時隊列
原理
結(jié)合Redis發(fā)布/訂閱功能與本地時間輪算法,實現(xiàn)延遲任務(wù)的分發(fā)和處理。任務(wù)信息存儲在Redis中,而時間輪負(fù)責(zé)任務(wù)的調(diào)度和發(fā)布。
代碼實現(xiàn)
public class RedisPubSubDelayQueue {
private final StringRedisTemplate redisTemplate;
private final String TASK_TOPIC = "delay_queue:task_channel";
private final String TASK_HASH = "delay_queue:tasks";
private final HashedWheelTimer timer;
public RedisPubSubDelayQueue(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
// 初始化時間輪,刻度100ms,輪子大小512
this.timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);
// 啟動消息訂閱
subscribeTaskChannel();
}
/**
* 添加延時任務(wù)
*/
public void addTask(String taskId, String taskInfo, long delaySeconds) {
// 存儲任務(wù)信息到Redis
redisTemplate.opsForHash().put(TASK_HASH, taskId, taskInfo);
// 添加到時間輪
timer.newTimeout(timeout -> {
// 發(fā)布任務(wù)就緒消息
redisTemplate.convertAndSend(TASK_TOPIC, taskId);
}, delaySeconds, TimeUnit.SECONDS);
System.out.println("Task scheduled: " + taskId + ", delay: " + delaySeconds + "s");
}
/**
* 訂閱任務(wù)通道
*/
private void subscribeTaskChannel() {
redisTemplate.getConnectionFactory().getConnection().subscribe(
(message, pattern) -> {
String taskId = new String(message.getBody());
// 獲取任務(wù)信息
String taskInfo = (String) redisTemplate.opsForHash().get(TASK_HASH, taskId);
if (taskInfo != null) {
// 處理任務(wù)
processTask(taskId, taskInfo);
// 刪除任務(wù)
redisTemplate.opsForHash().delete(TASK_HASH, taskId);
}
},
TASK_TOPIC.getBytes()
);
}
private void processTask(String taskId, String taskInfo) {
System.out.println("Processing task: " + taskId + " - " + taskInfo);
// 實際任務(wù)處理邏輯
}
// 模擬HashedWheelTimer類
public static class HashedWheelTimer {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final long tickDuration;
private final TimeUnit unit;
public HashedWheelTimer(long tickDuration, TimeUnit unit, int wheelSize) {
this.tickDuration = tickDuration;
this.unit = unit;
}
public void newTimeout(TimerTask task, long delay, TimeUnit timeUnit) {
long delayMillis = timeUnit.toMillis(delay);
scheduler.schedule(
() -> task.run(null),
delayMillis,
TimeUnit.MILLISECONDS
);
}
public interface TimerTask {
void run(Timeout timeout);
}
public interface Timeout {
}
}
}
優(yōu)缺點
優(yōu)點:
- 即時觸發(fā),無需輪詢
- 高效的時間輪算法
- 可以跨應(yīng)用訂閱任務(wù)
- 分離任務(wù)調(diào)度和執(zhí)行,降低耦合
缺點:
- 依賴本地時間輪,非純Redis實現(xiàn)
- Pub/Sub模式無消息持久化,可能丟失消息
- 服務(wù)重啟時需要重建時間輪
- 訂閱者需要保持連接
4. 基于Redis Stream的延時隊列
原理
Redis 5.0引入的Stream是一個強(qiáng)大的數(shù)據(jù)結(jié)構(gòu),專為消息隊列設(shè)計。結(jié)合Stream的消費組和確認(rèn)機(jī)制,可以構(gòu)建可靠的延時隊列。
代碼實現(xiàn)
public class RedisStreamDelayQueue {
private final StringRedisTemplate redisTemplate;
private final String delayQueueKey = "delay_queue:stream";
private final String consumerGroup = "delay_queue_consumers";
private final String consumerId = UUID.randomUUID().toString();
public RedisStreamDelayQueue(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
// 創(chuàng)建消費者組
try {
redisTemplate.execute((RedisCallback<String>) connection -> {
connection.streamCommands().xGroupCreate(
delayQueueKey.getBytes(),
consumerGroup,
ReadOffset.from("0"),
true
);
return "OK";
});
} catch (Exception e) {
// 消費者組可能已存在
System.out.println("Consumer group may already exist: " + e.getMessage());
}
}
/**
* 添加延時任務(wù)
*/
public void addTask(String taskInfo, long delaySeconds) {
long executeTime = System.currentTimeMillis() + delaySeconds * 1000;
Map<String, Object> task = new HashMap<>();
task.put("executeTime", String.valueOf(executeTime));
task.put("taskInfo", taskInfo);
redisTemplate.opsForStream().add(delayQueueKey, task);
System.out.println("Task added: " + taskInfo + ", execute at: " + executeTime);
}
/**
* 獲取待執(zhí)行的任務(wù)
*/
public List<String> pollTasks() {
long now = System.currentTimeMillis();
List<String> readyTasks = new ArrayList<>();
// 讀取尚未處理的消息
List<MapRecord<String, Object, Object>> records = redisTemplate.execute(
(RedisCallback<List<MapRecord<String, Object, Object>>>) connection -> {
return connection.streamCommands().xReadGroup(
consumerGroup.getBytes(),
consumerId.getBytes(),
StreamReadOptions.empty().count(10),
StreamOffset.create(delayQueueKey.getBytes(), ReadOffset.from(">"))
);
}
);
if (records != null) {
for (MapRecord<String, Object, Object> record : records) {
String messageId = record.getId().getValue();
Map<Object, Object> value = record.getValue();
long executeTime = Long.parseLong((String) value.get("executeTime"));
String taskInfo = (String) value.get("taskInfo");
// 檢查任務(wù)是否到期
if (executeTime <= now) {
readyTasks.add(taskInfo);
// 確認(rèn)消息已處理
redisTemplate.execute((RedisCallback<String>) connection -> {
connection.streamCommands().xAck(
delayQueueKey.getBytes(),
consumerGroup.getBytes(),
messageId.getBytes()
);
return "OK";
});
// 可選:從流中刪除消息
redisTemplate.opsForStream().delete(delayQueueKey, messageId);
} else {
// 任務(wù)未到期,放回隊列
redisTemplate.execute((RedisCallback<String>) connection -> {
connection.streamCommands().xAck(
delayQueueKey.getBytes(),
consumerGroup.getBytes(),
messageId.getBytes()
);
return "OK";
});
// 重新添加任務(wù)(可選:使用延遲重新入隊策略)
Map<String, Object> newTask = new HashMap<>();
newTask.put("executeTime", String.valueOf(executeTime));
newTask.put("taskInfo", taskInfo);
redisTemplate.opsForStream().add(delayQueueKey, newTask);
}
}
}
return readyTasks;
}
/**
* 啟動任務(wù)處理器
*/
public void startTaskProcessor() {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
try {
List<String> tasks = pollTasks();
for (String task : tasks) {
processTask(task);
}
} catch (Exception e) {
e.printStackTrace();
}
}, 0, 1, TimeUnit.SECONDS);
}
private void processTask(String taskInfo) {
System.out.println("Processing task: " + taskInfo);
// 實際任務(wù)處理邏輯
}
}
優(yōu)缺點
優(yōu)點:
- 支持消費者組和消息確認(rèn),提供可靠的消息處理
- 內(nèi)置消息持久化機(jī)制
- 支持多消費者并行處理
- 消息ID包含時間戳,便于排序
缺點:
- 要求Redis 5.0+版本
- 實現(xiàn)相對復(fù)雜
- 仍需輪詢獲取到期任務(wù)
- 對未到期任務(wù)的處理相對繁瑣
性能對比與選型建議
| 實現(xiàn)方式 | 性能 | 可靠性 | 實現(xiàn)復(fù)雜度 | 內(nèi)存占用 | 適用場景 |
|---|---|---|---|---|---|
| Sorted Set | ★★★★☆ | ★★★☆☆ | 低 | 中 | 任務(wù)量適中,需要精確調(diào)度 |
| List + 輪詢 | ★★★★★ | ★★★☆☆ | 中 | 低 | 高并發(fā),延時精度要求不高 |
| Pub/Sub + 時間輪 | ★★★★★ | ★★☆☆☆ | 高 | 低 | 實時性要求高,可容忍服務(wù)重啟丟失 |
| Stream | ★★★☆☆ | ★★★★★ | 高 | 中 | 可靠性要求高,需要消息確認(rèn) |
總結(jié)
在實際應(yīng)用中,可根據(jù)系統(tǒng)規(guī)模、性能需求、可靠性要求和實現(xiàn)復(fù)雜度等因素進(jìn)行選擇,也可以組合多種方式打造更符合業(yè)務(wù)需求的延時隊列解決方案。無論選擇哪種實現(xiàn),都應(yīng)關(guān)注可靠性、性能和監(jiān)控等方面,確保延時隊列在生產(chǎn)環(huán)境中穩(wěn)定運行。
到此這篇關(guān)于Redis中4種延時隊列實現(xiàn)方式小結(jié)的文章就介紹到這了,更多相關(guān)Redis延時隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Boot如何使用Undertow代替Tomcat
這篇文章主要介紹了Spring Boot如何使用Undertow代替Tomcat,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-09-09
Java 獲取當(dāng)前類名和方法名的實現(xiàn)方法
這篇文章主要介紹了 Java 獲取當(dāng)前類名和方法名的實現(xiàn)方法的相關(guān)資料,這里不僅提供了實現(xiàn)方法并比較幾種方法的效率,需要的朋友可以參考下2017-07-07
SQL Server 2000 Driver for JDBC Service Pack 3 安裝測試方法
這篇文章主要介紹了數(shù)據(jù)庫連接測試程序(SQL Server 2000 Driver for JDBC Service Pack 3 安裝測試),需要的朋友可以參考下2014-10-10
Java synchronized重量級鎖實現(xiàn)過程淺析
這篇文章主要介紹了Java synchronized重量級鎖實現(xiàn)過程,synchronized是Java里的一個關(guān)鍵字,起到的一個效果是"監(jiān)視器鎖",它的功能就是保證操作的原子性,同時禁止指令重排序和保證內(nèi)存的可見性2023-02-02
java狀態(tài)機(jī)方案解決訂單狀態(tài)扭轉(zhuǎn)示例詳解
這篇文章主要為大家介紹了java狀態(tài)機(jī)方案解決訂單狀態(tài)扭轉(zhuǎn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-03-03
Hibernate基于ThreadLocal管理Session過程解析
這篇文章主要介紹了Hibernate基于ThreadLocal管理Session過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-10-10

