Java中的延遲隊(duì)列DelayQueue詳細(xì)解析
前言
JDK自身支持延遲隊(duì)列的數(shù)據(jù)結(jié)構(gòu),其實(shí)類:java.util.concurrent.DelayQueue。
我們通過閱讀源碼的方式理解該延遲隊(duì)列類的實(shí)現(xiàn)過程。
1.定義
DelayQueue:是一種支持延時(shí)獲取元素的無界阻塞隊(duì)列。
特性:
- 線程安全(多生產(chǎn)者,多消費(fèi)者)(單機(jī),如果想實(shí)現(xiàn)分布式,可以結(jié)合redis 消息分發(fā),如果需要較高數(shù)據(jù)可靠性可以考慮結(jié)合消息中間件等);
- 內(nèi)部元素有“延遲”特性:只有延遲到期的元素才允許被獲??;
- 具有優(yōu)先級(jí)特性的無界隊(duì)列,優(yōu)先級(jí)以元素延遲時(shí)間為標(biāo)準(zhǔn),最先過期的元素優(yōu)先級(jí)最高(隊(duì)首);
- 入隊(duì)操作不會(huì)被阻塞,獲取元素在特定情況會(huì)阻塞(隊(duì)列為空,隊(duì)首元素延遲未到期等);
根據(jù)其源碼分析為何如此定義以及其特性的由來。
DelayQueue繼承關(guān)系:

類圖分析:
其核心繼承/實(shí)現(xiàn):
1.BlockingQueue:說明其具有阻塞隊(duì)列的特性;
2.元素必實(shí)現(xiàn)接口Delayed,而Delayed繼承了接口Comparable。因此所有元素必須實(shí)現(xiàn)兩個(gè)方法:
compareTo方法用于元素比較; getDelay方法用于獲取元素剩余延時(shí)時(shí)間。
public interface Delayed extends Comparable<Delayed> {
/**
* 返回關(guān)聯(lián)對(duì)象的剩余延遲時(shí)間(可指定時(shí)間單位)
*/
long getDelay(TimeUnit unit);
}2.源碼
public class DelayQueue<E extends Delayed>
extends AbstractQueue<E>
implements BlockingQueue<E> {
/**
* 可重入鎖,用于保證線程安全
*/
private final transient ReentrantLock lock = new ReentrantLock();
/**
* 優(yōu)先隊(duì)列(容器),實(shí)際存儲(chǔ)元素的地方
*/
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
* 等待取元素線程的領(lǐng)導(dǎo)(leader)線程,有且僅有一個(gè)leader。
* 具有最高優(yōu)先級(jí),第一個(gè)嘗試獲取元素的線程。
* leader取完元素后,會(huì)喚醒新的等待線程成為新的leader。
*/
private Thread leader = null;
/**
* 觸發(fā)條件,表示是否可以從隊(duì)列中讀取元素.
* 用于等待(await())/通知(signal())其他線程
*/
private final Condition available = lock.newCondition();
/**
* 構(gòu)造函數(shù)
*/
public DelayQueue() {
}
/**
* 構(gòu)造函數(shù): 調(diào)用addAll()方法:將集合c 存入隊(duì)列中
*
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
/*--------------------------添加元素(非阻塞)-------------------------------*/
/**
* 插入新元素.
* 核心內(nèi)容見:public boolean offer(E e)
*/
public boolean add(E e) {
return offer(e);
}
/**
* 插入新元素.
* 核心內(nèi)容見:public boolean offer(E e)
*/
public void put(E e) {
offer(e);
}
/**
* 插入新元素.
* 核心內(nèi)容見:public boolean offer(E e)
* @param e 元素
* @param timeout 此參數(shù)將被忽略,因?yàn)樵摲椒◤牟蛔枞◤U棄)
* @param unit 此參數(shù)將被忽略,因?yàn)樵摲椒◤牟蛔枞◤U棄)
* @return {@code true}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
/**
* 插入新元素.(線程安全 lock)
* 邏輯:
* 1.入隊(duì);
* 2.如果入隊(duì)元素為隊(duì)首元素(原隊(duì)列為空),喚醒一個(gè)等待的線程,通知獲取數(shù)據(jù)。
*
* @param e 元素
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 入隊(duì)
q.offer(e);
// 若該元素為隊(duì)列頭部元素(說明原隊(duì)列為空),可以喚醒等待的線程取元素?cái)?shù)據(jù)
if (q.peek() == e) {
// 如果隊(duì)首元素是剛插入的元素,則設(shè)置leader為null(騰位置)
leader = null;
// 喚醒一個(gè)等待的線程
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
/*--------------------------取出(返回并刪除)元素-------------------------------*/
/**
* 取出延遲到期元素(非阻塞的).(線程安全 lock)
* poll() 方法是非阻塞的,即調(diào)用之后無論元素是否存在/延遲到期都會(huì)立即返回。
* 邏輯:
* 1.查詢隊(duì)首元素;
* 2.元素延遲到期返回,否則返回null
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 查詢隊(duì)首元素
E first = q.peek();
// 隊(duì)首元素為空或者延時(shí)未到期 返回null
if (first == null || first.getDelay(NANOSECONDS) > 0) {
return null;
} else {
// 如果到期,取出并刪除隊(duì)首元素
return q.poll();
}
} finally {
lock.unlock();
}
}
/**
* 取出延遲到期元素(帶有超時(shí)時(shí)間,阻塞).(線程安全 lock)
* 如果隊(duì)首元素未到期或者為null,等待:直到隊(duì)首元素延遲到期或者超出指定等待時(shí)間(timeout)
* 邏輯(無限循環(huán)等待獲?。?
* 宗旨:在不超出timeout的時(shí)間內(nèi),循環(huán)去取出延遲到期的隊(duì)首元素(前提無其他線程正在取數(shù)--互斥).
* 1.查詢隊(duì)首元素;
* 2.1.隊(duì)列空:等待timeout一段時(shí)間,直到等待超時(shí)(即timeout被重置小于等于0);
* 2.2.隊(duì)列不為空:
* 2.2.1. 隊(duì)首元素延遲到期,取出隊(duì)首元素(poll());
* 2.2.2. 隊(duì)首元素延遲未到期:
* 2.2.3 等待超時(shí) ,返回null;
* 2.2.4 等待未超時(shí),等待時(shí)間<延遲時(shí)間或者有其他線程正在取數(shù)據(jù),繼續(xù)等待到超時(shí)到期
* 2.2.5 等待為超時(shí),等待時(shí)間>=延遲時(shí)間并且無其他線程正在取數(shù)據(jù),該線程設(shè)置為leader等待到延遲到期(最后清空leader)
* 3. 循環(huán)后,如果leader=null(無正在取數(shù)線程)并且隊(duì)列還有數(shù)據(jù),喚醒一個(gè)等待線程最終成為leader.
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 以可中斷方式獲取鎖
lock.lockInterruptibly();
try {
for (; ; ) {
// 獲取隊(duì)首元素
E first = q.peek();
if (first == null) {
// 若隊(duì)首元素為空(即隊(duì)列為空,這時(shí)就需要關(guān)注,當(dāng)前取值請(qǐng)求是否需要阻塞等待
// 等待時(shí)間小于等于0 ,不阻塞等待,直接返回null)
if (nanos <= 0) {
return null;
} else {
// 等待相應(yīng)的時(shí)間
nanos = available.awaitNanos(nanos);
}
} else {
// 若隊(duì)列元素非空,獲取隊(duì)首元素剩余延遲時(shí)間
long delay = first.getDelay(NANOSECONDS);
// 延時(shí)過期 返回元素
if (delay <= 0) {
return q.poll();
}
// 延時(shí)未過期 等待時(shí)間超時(shí) ,不等待,直接返回null
if (nanos <= 0) {
return null;
}
first = null;
// 延時(shí)和等待都未到期且等待時(shí)間<延遲時(shí)間 或者 有其他線程在取數(shù)據(jù),當(dāng)前請(qǐng)求繼續(xù)等待
if (nanos < delay || leader != null) {
nanos = available.awaitNanos(nanos);
} else {
// 沒有其他線程等待,將當(dāng)前線程設(shè)置為 leader,類似于“獨(dú)占”操作
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待直到延遲到期
long timeLeft = available.awaitNanos(delay);
// 計(jì)算超時(shí)時(shí)間
nanos -= delay - timeLeft;
} finally {
// 該線程操作完畢,把 leader 置空
if (leader == thisThread) {
leader = null;
}
}
}
}
}
} finally {
// 如果leader線程為空 并且 queue非空,則喚醒其他等待線程
if (leader == null && q.peek() != null) {
available.signal();
}
lock.unlock();
}
}
/**
* 取出延遲到期元素(無超時(shí)時(shí)間限制,阻塞).(線程安全 lock)
* 邏輯(無限循環(huán)等待獲?。?
* 其邏輯參考poll(long timeout, TimeUnit unit).
* 其區(qū)別在于:不受超時(shí)時(shí)間限制(timeout)
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 以可中斷方式獲取鎖
lock.lockInterruptibly();
try {
// 無限循環(huán)
for (; ; ) {
// 獲取隊(duì)首元素
E first = q.peek();
if (first == null) {
// 若隊(duì)首元素為空(隊(duì)列為空),則等待
available.await();
} else {
// 若隊(duì)列元素非空,獲取隊(duì)首元素剩余延遲時(shí)間
long delay = first.getDelay(NANOSECONDS);
// 延遲到期,獲取隊(duì)首元素
if (delay <= 0) {
return q.poll();
}
// 延時(shí)未過期
first = null;
// leader 不為空表示有其他線程在讀取數(shù)據(jù),當(dāng)前線程等待
if (leader != null) {
available.await();
} else {
// 沒有其他線程等待,將當(dāng)前線程設(shè)置為 leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待延遲時(shí)間過期
available.awaitNanos(delay);
} finally {
if (leader == thisThread) {
leader = null;
}
}
}
}
}
} finally {
// 如果leader線程為空 并且 queue非空,則喚醒其他等待線程
if (leader == null && q.peek() != null) {
available.signal();
}
lock.unlock();
}
}
/*--------------------------讀取隊(duì)首元素-------------------------------*/
/**
* 讀取隊(duì)首元素.(線程安全 lock)
*/
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
/*--------------------------讀取隊(duì)列長度-------------------------------*/
/**
* 獲取隊(duì)列數(shù)據(jù)的長度.(線程安全 lock)
*/
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.size();
} finally {
lock.unlock();
}
}
/*--------------------------獲取延遲到期元素集合-------------------------------*/
/**
* 將隊(duì)列中延遲到期數(shù)據(jù) 收集到集合C中.(線程安全 lock)
*
* @return 返回延遲到期元素?cái)?shù)量
*/
public int drainTo(Collection<? super E> c) {
if (c == null) {
throw new NullPointerException();
}
if (c == this) {
throw new IllegalArgumentException();
}
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
// peekExpired() 判斷隊(duì)首元素是否延遲到期
for (E e; (e = peekExpired()) != null; ) {
c.add(e);
q.poll();
++n;
}
return n;
} finally {
lock.unlock();
}
}
/**
* 將隊(duì)列中延遲到期數(shù)據(jù) 收集到集合C中(C集合總數(shù)有限制小于maxElements).(線程安全 lock)
* @return 返回延遲到期元素?cái)?shù)量
*/
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null) {
throw new NullPointerException();
}
if (c == this) {
throw new IllegalArgumentException();
}
if (maxElements <= 0) {
return 0;
}
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
// peekExpired() 判斷隊(duì)首元素是否延遲到期。并且到期元素總數(shù)不允許超過maxElements
for (E e; n < maxElements && (e = peekExpired()) != null; ) {
c.add(e);
q.poll();
++n;
}
return n;
} finally {
lock.unlock();
}
}
/**
* 讀取隊(duì)首元素(已延遲到期).(私有方法)
*/
private E peekExpired() {
// 獲取隊(duì)首元素
E first = q.peek();
// 隊(duì)首元素存在并且延遲到期,否則返回null
return (first == null || first.getDelay(NANOSECONDS) > 0) ?
null : first;
}
/*--------------------------刪除元素-------------------------------*/
/**
* 清除隊(duì)列中所有元素(線程安全 lock)--暴力清除
*/
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.clear();
} finally {
lock.unlock();
}
}
/**
* 刪除指定元素O.(線程安全 lock)
*/
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.remove(o);
} finally {
lock.unlock();
}
}
/**
* 刪除指定元素O.(這里指的是相同的對(duì)象引用/內(nèi)存地址)(線程安全 lock)
*/
void removeEQ(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
// 使用了對(duì)象引用/內(nèi)存地址相等比較
if (o == it.next()) {
it.remove();
break;
}
}
} finally {
lock.unlock();
}
}
/*--------------------------隊(duì)列轉(zhuǎn)數(shù)組-------------------------------*/
/**
* 將隊(duì)列元素都復(fù)制到數(shù)組中(無序).(線程安全 lock)
*/
public Object[] toArray() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.toArray();
} finally {
lock.unlock();
}
}
/**
* 將隊(duì)列元素都復(fù)制到數(shù)組a中(無序).
*/
public <T> T[] toArray(T[] a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.toArray(a);
} finally {
lock.unlock();
}
}
/*--------------------------私有內(nèi)部類--迭代器-------------------------------*/
/**
* 返回此隊(duì)列中所有元素(已過期和未過期)的迭代器。迭代器不按任何特定順序返回元素。
*/
public Iterator<E> iterator() {
return new Itr(toArray());
}
/**
* 快照迭代器,用于處理底層 隊(duì)列/數(shù)組的副本。
*/
private class Itr implements Iterator<E> {
final Object[] array; // Array of all elements
int cursor; // index of next element to return
int lastRet; // index of last element, or -1 if no such
Itr(Object[] array) {
lastRet = -1;
this.array = array;
}
public boolean hasNext() {
return cursor < array.length;
}
@SuppressWarnings("unchecked")
public E next() {
if (cursor >= array.length)
throw new NoSuchElementException();
lastRet = cursor;
return (E) array[cursor++];
}
public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
removeEQ(array[lastRet]);
lastRet = -1;
}
}
}3.使用demo
使用DelayQueue實(shí)現(xiàn)延遲隊(duì)列:
優(yōu)點(diǎn):實(shí)現(xiàn)簡單。
缺點(diǎn):可擴(kuò)展性較差,內(nèi)存限制、無持久化機(jī)制等。
@SneakyThrows
public static void main(String[] args) {
DelayQueue<TestTask> testTaskDelayQueue = new DelayQueue<>();
long time = System.currentTimeMillis();
testTaskDelayQueue.offer(TestTask.builder().name("test_1").endTime(time + 10 * 1000).build());
testTaskDelayQueue.offer(TestTask.builder().name("test_2").endTime(time + 4 * 1000).build());
testTaskDelayQueue.offer(TestTask.builder().name("test_3").endTime(time + 16 * 1000).build());
for(;;){
System.out.println(testTaskDelayQueue.take());
TimeUnit.SECONDS.sleep(2);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
private static class TestTask implements Delayed {
private String name;
private Long endTime;
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
}到此這篇關(guān)于Java中的延遲隊(duì)列DelayQueue詳細(xì)解析的文章就介紹到這了,更多相關(guān)延遲隊(duì)列DelayQueue內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Spring Boot Admin監(jiān)控服務(wù)上下線郵件通知
本篇文章主要介紹了詳解Spring Boot Admin監(jiān)控服務(wù)上下線郵件通知,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-12-12
Java判斷IP地址為內(nèi)網(wǎng)IP還是公網(wǎng)IP的方法
這篇文章主要介紹了Java判斷IP地址為內(nèi)網(wǎng)IP還是公網(wǎng)IP的方法,針對(duì)tcp/ip協(xié)議中保留的三個(gè)私有地址進(jìn)行判斷分析,是比較實(shí)用的技巧,需要的朋友可以參考下2015-01-01
SpringBoot中過濾器Filter+JWT令牌實(shí)現(xiàn)登錄驗(yàn)證
本文主要介紹了SpringBoot中過濾器Filter+JWT令牌實(shí)現(xiàn)登錄驗(yàn)證,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-04-04
學(xué)習(xí)java編程后可以走哪些職業(yè)道路
在本篇文章里給大家介紹了關(guān)于學(xué)習(xí)java后的職業(yè)道路,以及需要學(xué)習(xí)的相關(guān)知識(shí)內(nèi)容,有興趣的朋友們可以跟著學(xué)習(xí)下。2022-11-11
基于Java實(shí)現(xiàn)多線程下載并允許斷點(diǎn)續(xù)傳
這篇文章主要介紹了基于Java實(shí)現(xiàn)多線程下載并允許斷點(diǎn)續(xù)傳,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03
Java調(diào)用第三方http接口的四種方式總結(jié)
這篇文章主要給大家介紹了關(guān)于Java調(diào)用第三方http接口的四種方式,在實(shí)際開發(fā)中我們經(jīng)常會(huì)與第三方公司進(jìn)行合作,接入第三方接口,文中給出了詳細(xì)的代碼實(shí)例,需要的朋友可以參考下2023-08-08
Mac使用Idea配置傳統(tǒng)SSM項(xiàng)目(非maven項(xiàng)目)
本文主要介紹了Mac使用Idea配置傳統(tǒng)SSM項(xiàng)目(非maven項(xiàng)目),將展示如何設(shè)置項(xiàng)目結(jié)構(gòu)、添加依賴關(guān)系等,具有一定的參考價(jià)值,感興趣的可以了解一下2024-01-01

