Java實現(xiàn)DelayQueue延遲隊列示例代碼
JavaDelayQueue延遲隊列
1.DelayQueue概述
DelayQueue 是 Java 并發(fā)包(java.util.concurrent)中的一個 無界 阻塞隊列,用于存儲實現(xiàn)了 Delayed 接口的元素。隊列中的元素只有在達(dá)到指定的延遲時間后才能被獲取。
2.DelayQueue的底層數(shù)據(jù)結(jié)構(gòu)
DelayQueue 的底層數(shù)據(jù)結(jié)構(gòu)是 優(yōu)先級隊列(PriorityQueue),它是一個小頂堆(最小堆),根據(jù)元素的過期時間進(jìn)行排序。
- 底層采用 PriorityQueue(基于堆的實現(xiàn))
- 按照到期時間升序排列,即最早過期的元素在堆頂
- 元素未過期時,take() 方法會阻塞
- 支持多線程并發(fā)訪問
3.DelayQueue的實現(xiàn)原理
元素需實現(xiàn)
Delayed接口,重寫getDelay()方法,返回剩余的延遲時間。DelayQueue內(nèi)部維護(hù)一個PriorityQueue<Delayed>。插入元素時,按照到期時間排序,最早到期的元素位于堆頂。
take()
方法獲取堆頂元素:
- 若到期,直接返回該元素。
- 若未到期,線程阻塞,直到該元素可用。
- 使用鎖 + 條件變量(
ReentrantLock+Condition)控制并發(fā)訪問。
4.DelayQueue的應(yīng)用場景
DelayQueue 適用于 延遲執(zhí)行、定時任務(wù)、緩存超時管理 等場景,包括:
- 任務(wù)調(diào)度(如延遲執(zhí)行任務(wù)、重試機(jī)制)
- 定時消息隊列(如 Kafka 里的延時消息)
- 訂單超時取消(未支付訂單自動取消)
- 緩存自動過期(定期清除緩存)
- 連接超時管理(網(wǎng)絡(luò)連接的超時處理)
5.DelayQueue的優(yōu)缺點
優(yōu)點
- 高效的時間管理,自動處理過期元素
- 線程安全,內(nèi)部使用
ReentrantLock保證并發(fā)安全 - 無界隊列,但受內(nèi)存限制
- 阻塞機(jī)制,減少 CPU 輪詢
缺點
- 不支持元素移除(除非手動遍歷
remove()) - 不能提前獲取未到期元素(
poll()只返回到期元素) - 無上限(可能導(dǎo)致 OOM)
6.DelayQueue的替代方案
| 需求 | 替代方案 |
|---|---|
| 需要定時任務(wù) | ScheduledThreadPoolExecutor |
| 需要分布式延遲隊列 | Redis ZSet(基于時間戳排序) |
| 高吞吐延遲消息隊列 | Kafka + 延遲插件 |
| 低延遲任務(wù)調(diào)度 | TimeWheel(時間輪算法,如 Netty 的 HashedWheelTimer) |
7.DelayQueue使用示例
(1) 定義延遲元素
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
class DelayedTask implements Delayed {
private final long delayTime; // 延遲時間
private final long expireTime; // 過期時間
private final String name;
public DelayedTask(String name, long delay, TimeUnit unit) {
this.name = name;
this.delayTime = TimeUnit.MILLISECONDS.convert(delay, unit);
this.expireTime = System.currentTimeMillis() + this.delayTime;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "Task{" + "name='" + name + '\'' + ", expireTime=" + expireTime + '}';
}
}
(2) 使用DelayQueue
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DelayQueueExample {
public static void main(String[] args) {
DelayQueue<DelayedTask> queue = new DelayQueue<>();
queue.add(new DelayedTask("Task1", 3, TimeUnit.SECONDS));
queue.add(new DelayedTask("Task2", 1, TimeUnit.SECONDS));
queue.add(new DelayedTask("Task3", 5, TimeUnit.SECONDS));
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
while (!queue.isEmpty()) {
try {
DelayedTask task = queue.take(); // 阻塞獲取到期任務(wù)
System.out.println("Executing: " + task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
executor.shutdown();
}
}
(3) 運行結(jié)果
Executing: Task{name='Task2', expireTime=...} // 1s 后執(zhí)行
Executing: Task{name='Task1', expireTime=...} // 3s 后執(zhí)行
Executing: Task{name='Task3', expireTime=...} // 5s 后執(zhí)行
8. 總結(jié)
DelayQueue是基于優(yōu)先級隊列的小頂堆實現(xiàn)的阻塞隊列。- 元素需實現(xiàn)
Delayed接口,getDelay()返回剩余時間。 - 適用于 延遲任務(wù)調(diào)度、緩存過期管理、訂單超時 等場景。
- 主要優(yōu)點是 線程安全、自動延遲控制,缺點是 無法主動獲取未到期元素。
- 可用 ScheduledThreadPoolExecutor、Redis ZSet、Kafka 延時隊列 作為替代方案。
分布式微服務(wù)架構(gòu)下,能使用DelayQueue嗎?
在 分布式微服務(wù)架構(gòu) 下,不推薦直接使用 DelayQueue,主要原因如下:
1.DelayQueue的局限性
- 單機(jī)限制:
DelayQueue是 JVM 內(nèi)存隊列,它運行在單個進(jìn)程,無法在多個微服務(wù)實例間共享數(shù)據(jù),不能保證高可用性和擴(kuò)展性。 - 數(shù)據(jù)丟失風(fēng)險:若微服務(wù)實例崩潰或重啟,
DelayQueue中的任務(wù)會丟失,缺乏持久化機(jī)制。 - 無水平擴(kuò)展能力:隨著流量增長,多個實例無法共享隊列,容易成為瓶頸。
2. 適用于DelayQueue的場景
盡管 DelayQueue 不能直接用于分布式架構(gòu),但在單機(jī)任務(wù)調(diào)度、短時間小規(guī)模的延遲任務(wù)場景下仍然可行,例如:
- 同一個微服務(wù)實例內(nèi)的短期任務(wù)(如 1-10 秒級的延遲任務(wù))
- 不需要高可靠性的本地任務(wù)(如定期緩存清理)
- 沒有跨實例同步要求的任務(wù)(如本地事件延遲處理)
3. 分布式替代方案
若要在分布式微服務(wù)架構(gòu)中實現(xiàn)可擴(kuò)展、高可用的延遲任務(wù)調(diào)度,可以采用以下方案:
(1) Redis ZSet(有序集合)+ 定時輪詢
原理:利用 Redis 的 ZSet(有序集合),按照
score存儲任務(wù)的執(zhí)行時間戳,每隔 N 毫秒 輪詢一次取出到期任務(wù)執(zhí)行。優(yōu)勢:
- 支持 分布式部署,多個實例可共享數(shù)據(jù)
- 持久化,即使服務(wù)重啟,任務(wù)仍然存在
- 高性能,Redis 讀寫性能優(yōu)越
示例:
jedis.zadd("delayQueue", System.currentTimeMillis() + 5000, "order:123"); // 5s 后執(zhí)行 Set<String> tasks = jedis.zrangeByScore("delayQueue", 0, System.currentTimeMillis()); if (!tasks.isEmpty()) { tasks.forEach(task -> { process(task); // 處理任務(wù) jedis.zrem("delayQueue", task); // 移除已處理任務(wù) }); }適用場景:
- 訂單超時處理
- 定時消息推送
- 低吞吐的延遲任務(wù)(如秒級延遲)
(2) Kafka + 延遲隊列插件
- 原理:Kafka 通過
Kafka Streams或 延遲隊列插件(如Kafka Delay Message)支持延遲消費消息。 - 適用場景:
- 高吞吐的延遲任務(wù)
- 可靠的分布式消息隊列
- 缺點:
- 依賴 Kafka,適用于 需要消息隊列的業(yè)務(wù)
(3) RabbitMQ/ActiveMQ TTL + 死信隊列
原理:RabbitMQ 支持 TTL(Time-To-Live) 設(shè)置,消息超時后自動進(jìn)入 DLX(Dead Letter Exchange, 死信隊列),可用 消費者監(jiān)聽 處理。
適用場景:
- 需要可靠消息隊列
- 需要高吞吐延遲任務(wù)
示例:
channel.queueDeclare("delayQueue", true, false, false, Map.of("x-message-ttl", 5000)); channel.basicPublish("", "delayQueue", MessageProperties.PERSISTENT_TEXT_PLAIN, "Delayed Message".getBytes());缺點:
- 依賴消息中間件,適用于 消息驅(qū)動的系統(tǒng)
(4) 分布式任務(wù)調(diào)度框架
- 常見框架:
- XXL-JOB(輕量級,適用于小規(guī)模定時任務(wù))
- Elastic-Job(基于 Zookeeper,適用于高并發(fā)調(diào)度)
- Quartz + DB 持久化(適用于復(fù)雜定時任務(wù))
- 適用場景:
- 定時任務(wù)執(zhí)行
- 任務(wù)分片調(diào)度
- 可持久化任務(wù)隊列
4. 結(jié)論
建議:如果是 單機(jī)應(yīng)用,可以使用 DelayQueue;如果是 分布式微服務(wù)架構(gòu),建議使用 Redis ZSet / Kafka / RabbitMQ / 任務(wù)調(diào)度框架 實現(xiàn)延遲任務(wù)。
到此這篇關(guān)于Java實現(xiàn)DelayQueue延遲隊列的文章就介紹到這了,更多相關(guān)Java DelayQueue延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Spring中使用@within與@target的區(qū)別
這篇文章主要介紹了Spring中使用@within與@target的一些區(qū)別,本文通過項目案例給大家詳細(xì)分析,給大家介紹的非常詳細(xì),代碼簡單易懂,需要的朋友可以參考下2021-09-09
Springboot整合mybatisplus的項目實戰(zhàn)
本文主要介紹了Springboot整合mybatisplus的項目實戰(zhàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06
解決創(chuàng)建springboot后啟動報錯:Failed?to?bind?properties?under‘spri
在Spring?Boot項目中,application.properties和application.yml是用于配置參數(shù)的兩種文件格式,properties格式簡潔但不支持層次結(jié)構(gòu),而yml格式支持層次性,可讀性更好,在yml文件中,要注意細(xì)節(jié),比如冒號后面需要空格2024-10-10
分享Java8中通過Stream對列表進(jìn)行去重的實現(xiàn)
本文主要介紹了分享Java8中通過Stream對列表進(jìn)行去重的實現(xiàn),包括兩種方法,具有一定的參考價值,感興趣的可以了解一下2023-11-11

