Java實(shí)現(xiàn)DelayQueue延遲隊(duì)列示例代碼
JavaDelayQueue延遲隊(duì)列
1.DelayQueue概述
DelayQueue 是 Java 并發(fā)包(java.util.concurrent)中的一個(gè) 無界 阻塞隊(duì)列,用于存儲實(shí)現(xiàn)了 Delayed 接口的元素。隊(duì)列中的元素只有在達(dá)到指定的延遲時(shí)間后才能被獲取。
2.DelayQueue的底層數(shù)據(jù)結(jié)構(gòu)
DelayQueue 的底層數(shù)據(jù)結(jié)構(gòu)是 優(yōu)先級隊(duì)列(PriorityQueue),它是一個(gè)小頂堆(最小堆),根據(jù)元素的過期時(shí)間進(jìn)行排序。
- 底層采用 PriorityQueue(基于堆的實(shí)現(xiàn))
- 按照到期時(shí)間升序排列,即最早過期的元素在堆頂
- 元素未過期時(shí),take() 方法會(huì)阻塞
- 支持多線程并發(fā)訪問
3.DelayQueue的實(shí)現(xiàn)原理
元素需實(shí)現(xiàn)
Delayed接口,重寫getDelay()方法,返回剩余的延遲時(shí)間。DelayQueue內(nèi)部維護(hù)一個(gè)PriorityQueue<Delayed>。插入元素時(shí),按照到期時(shí)間排序,最早到期的元素位于堆頂。
take()
方法獲取堆頂元素:
- 若到期,直接返回該元素。
- 若未到期,線程阻塞,直到該元素可用。
- 使用鎖 + 條件變量(
ReentrantLock+Condition)控制并發(fā)訪問。
4.DelayQueue的應(yīng)用場景
DelayQueue 適用于 延遲執(zhí)行、定時(shí)任務(wù)、緩存超時(shí)管理 等場景,包括:
- 任務(wù)調(diào)度(如延遲執(zhí)行任務(wù)、重試機(jī)制)
- 定時(shí)消息隊(duì)列(如 Kafka 里的延時(shí)消息)
- 訂單超時(shí)取消(未支付訂單自動(dòng)取消)
- 緩存自動(dòng)過期(定期清除緩存)
- 連接超時(shí)管理(網(wǎng)絡(luò)連接的超時(shí)處理)
5.DelayQueue的優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
- 高效的時(shí)間管理,自動(dòng)處理過期元素
- 線程安全,內(nèi)部使用
ReentrantLock保證并發(fā)安全 - 無界隊(duì)列,但受內(nèi)存限制
- 阻塞機(jī)制,減少 CPU 輪詢
缺點(diǎn)
- 不支持元素移除(除非手動(dòng)遍歷
remove()) - 不能提前獲取未到期元素(
poll()只返回到期元素) - 無上限(可能導(dǎo)致 OOM)
6.DelayQueue的替代方案
| 需求 | 替代方案 |
|---|---|
| 需要定時(shí)任務(wù) | ScheduledThreadPoolExecutor |
| 需要分布式延遲隊(duì)列 | Redis ZSet(基于時(shí)間戳排序) |
| 高吞吐延遲消息隊(duì)列 | Kafka + 延遲插件 |
| 低延遲任務(wù)調(diào)度 | TimeWheel(時(shí)間輪算法,如 Netty 的 HashedWheelTimer) |
7.DelayQueue使用示例
(1) 定義延遲元素
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
class DelayedTask implements Delayed {
private final long delayTime; // 延遲時(shí)間
private final long expireTime; // 過期時(shí)間
private final String name;
public DelayedTask(String name, long delay, TimeUnit unit) {
this.name = name;
this.delayTime = TimeUnit.MILLISECONDS.convert(delay, unit);
this.expireTime = System.currentTimeMillis() + this.delayTime;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "Task{" + "name='" + name + '\'' + ", expireTime=" + expireTime + '}';
}
}
(2) 使用DelayQueue
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DelayQueueExample {
public static void main(String[] args) {
DelayQueue<DelayedTask> queue = new DelayQueue<>();
queue.add(new DelayedTask("Task1", 3, TimeUnit.SECONDS));
queue.add(new DelayedTask("Task2", 1, TimeUnit.SECONDS));
queue.add(new DelayedTask("Task3", 5, TimeUnit.SECONDS));
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
while (!queue.isEmpty()) {
try {
DelayedTask task = queue.take(); // 阻塞獲取到期任務(wù)
System.out.println("Executing: " + task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
executor.shutdown();
}
}
(3) 運(yùn)行結(jié)果
Executing: Task{name='Task2', expireTime=...} // 1s 后執(zhí)行
Executing: Task{name='Task1', expireTime=...} // 3s 后執(zhí)行
Executing: Task{name='Task3', expireTime=...} // 5s 后執(zhí)行
8. 總結(jié)
DelayQueue是基于優(yōu)先級隊(duì)列的小頂堆實(shí)現(xiàn)的阻塞隊(duì)列。- 元素需實(shí)現(xiàn)
Delayed接口,getDelay()返回剩余時(shí)間。 - 適用于 延遲任務(wù)調(diào)度、緩存過期管理、訂單超時(shí) 等場景。
- 主要優(yōu)點(diǎn)是 線程安全、自動(dòng)延遲控制,缺點(diǎn)是 無法主動(dòng)獲取未到期元素。
- 可用 ScheduledThreadPoolExecutor、Redis ZSet、Kafka 延時(shí)隊(duì)列 作為替代方案。
分布式微服務(wù)架構(gòu)下,能使用DelayQueue嗎?
在 分布式微服務(wù)架構(gòu) 下,不推薦直接使用 DelayQueue,主要原因如下:
1.DelayQueue的局限性
- 單機(jī)限制:
DelayQueue是 JVM 內(nèi)存隊(duì)列,它運(yùn)行在單個(gè)進(jìn)程,無法在多個(gè)微服務(wù)實(shí)例間共享數(shù)據(jù),不能保證高可用性和擴(kuò)展性。 - 數(shù)據(jù)丟失風(fēng)險(xiǎn):若微服務(wù)實(shí)例崩潰或重啟,
DelayQueue中的任務(wù)會(huì)丟失,缺乏持久化機(jī)制。 - 無水平擴(kuò)展能力:隨著流量增長,多個(gè)實(shí)例無法共享隊(duì)列,容易成為瓶頸。
2. 適用于DelayQueue的場景
盡管 DelayQueue 不能直接用于分布式架構(gòu),但在單機(jī)任務(wù)調(diào)度、短時(shí)間小規(guī)模的延遲任務(wù)場景下仍然可行,例如:
- 同一個(gè)微服務(wù)實(shí)例內(nèi)的短期任務(wù)(如 1-10 秒級的延遲任務(wù))
- 不需要高可靠性的本地任務(wù)(如定期緩存清理)
- 沒有跨實(shí)例同步要求的任務(wù)(如本地事件延遲處理)
3. 分布式替代方案
若要在分布式微服務(wù)架構(gòu)中實(shí)現(xiàn)可擴(kuò)展、高可用的延遲任務(wù)調(diào)度,可以采用以下方案:
(1) Redis ZSet(有序集合)+ 定時(shí)輪詢
原理:利用 Redis 的 ZSet(有序集合),按照
score存儲任務(wù)的執(zhí)行時(shí)間戳,每隔 N 毫秒 輪詢一次取出到期任務(wù)執(zhí)行。優(yōu)勢:
- 支持 分布式部署,多個(gè)實(shí)例可共享數(shù)據(jù)
- 持久化,即使服務(wù)重啟,任務(wù)仍然存在
- 高性能,Redis 讀寫性能優(yōu)越
示例:
jedis.zadd("delayQueue", System.currentTimeMillis() + 5000, "order:123"); // 5s 后執(zhí)行 Set<String> tasks = jedis.zrangeByScore("delayQueue", 0, System.currentTimeMillis()); if (!tasks.isEmpty()) { tasks.forEach(task -> { process(task); // 處理任務(wù) jedis.zrem("delayQueue", task); // 移除已處理任務(wù) }); }適用場景:
- 訂單超時(shí)處理
- 定時(shí)消息推送
- 低吞吐的延遲任務(wù)(如秒級延遲)
(2) Kafka + 延遲隊(duì)列插件
- 原理:Kafka 通過
Kafka Streams或 延遲隊(duì)列插件(如Kafka Delay Message)支持延遲消費(fèi)消息。 - 適用場景:
- 高吞吐的延遲任務(wù)
- 可靠的分布式消息隊(duì)列
- 缺點(diǎn):
- 依賴 Kafka,適用于 需要消息隊(duì)列的業(yè)務(wù)
(3) RabbitMQ/ActiveMQ TTL + 死信隊(duì)列
原理:RabbitMQ 支持 TTL(Time-To-Live) 設(shè)置,消息超時(shí)后自動(dòng)進(jìn)入 DLX(Dead Letter Exchange, 死信隊(duì)列),可用 消費(fèi)者監(jiān)聽 處理。
適用場景:
- 需要可靠消息隊(duì)列
- 需要高吞吐延遲任務(wù)
示例:
channel.queueDeclare("delayQueue", true, false, false, Map.of("x-message-ttl", 5000)); channel.basicPublish("", "delayQueue", MessageProperties.PERSISTENT_TEXT_PLAIN, "Delayed Message".getBytes());缺點(diǎn):
- 依賴消息中間件,適用于 消息驅(qū)動(dòng)的系統(tǒng)
(4) 分布式任務(wù)調(diào)度框架
- 常見框架:
- XXL-JOB(輕量級,適用于小規(guī)模定時(shí)任務(wù))
- Elastic-Job(基于 Zookeeper,適用于高并發(fā)調(diào)度)
- Quartz + DB 持久化(適用于復(fù)雜定時(shí)任務(wù))
- 適用場景:
- 定時(shí)任務(wù)執(zhí)行
- 任務(wù)分片調(diào)度
- 可持久化任務(wù)隊(duì)列
4. 結(jié)論
建議:如果是 單機(jī)應(yīng)用,可以使用 DelayQueue;如果是 分布式微服務(wù)架構(gòu),建議使用 Redis ZSet / Kafka / RabbitMQ / 任務(wù)調(diào)度框架 實(shí)現(xiàn)延遲任務(wù)。
到此這篇關(guān)于Java實(shí)現(xiàn)DelayQueue延遲隊(duì)列的文章就介紹到這了,更多相關(guān)Java DelayQueue延遲隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Spring中使用@within與@target的區(qū)別
這篇文章主要介紹了Spring中使用@within與@target的一些區(qū)別,本文通過項(xiàng)目案例給大家詳細(xì)分析,給大家介紹的非常詳細(xì),代碼簡單易懂,需要的朋友可以參考下2021-09-09
Springboot整合mybatisplus的項(xiàng)目實(shí)戰(zhàn)
本文主要介紹了Springboot整合mybatisplus的項(xiàng)目實(shí)戰(zhàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06
spring boot 圖片上傳與顯示功能實(shí)例詳解
這篇文章主要介紹了spring boot 圖片上傳與顯示功能實(shí)例詳解,需要的朋友可以參考下2017-04-04
解決創(chuàng)建springboot后啟動(dòng)報(bào)錯(cuò):Failed?to?bind?properties?under‘spri
在Spring?Boot項(xiàng)目中,application.properties和application.yml是用于配置參數(shù)的兩種文件格式,properties格式簡潔但不支持層次結(jié)構(gòu),而yml格式支持層次性,可讀性更好,在yml文件中,要注意細(xì)節(jié),比如冒號后面需要空格2024-10-10
分享Java8中通過Stream對列表進(jìn)行去重的實(shí)現(xiàn)
本文主要介紹了分享Java8中通過Stream對列表進(jìn)行去重的實(shí)現(xiàn),包括兩種方法,具有一定的參考價(jià)值,感興趣的可以了解一下2023-11-11

