java并發(fā)中DelayQueue延遲隊列原理剖析
介紹
DelayQueue隊列是一個延遲隊列,DelayQueue中存放的元素必須實現(xiàn)Delayed接口的元素,實現(xiàn)接口后相當于是每個元素都有個過期時間,當隊列進行take獲取元素時,先要判斷元素有沒有過期,只有過期的元素才能出隊操作,沒有過期的隊列需要等待剩余過期時間才能進行出隊操作。
源碼分析
DelayQueue隊列內(nèi)部使用了PriorityQueue優(yōu)先隊列來進行存放數(shù)據(jù),它采用的是二叉堆進行的優(yōu)先隊列,使用ReentrantLock鎖來控制線程同步,由于內(nèi)部元素是采用的PriorityQueue來進行存放數(shù)據(jù),所以Delayed接口實現(xiàn)了Comparable接口,用于比較來控制優(yōu)先級,如下代碼所示:
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
DelayQueue的成員變量如下所示:
// 鎖。 private final transient ReentrantLock lock = new ReentrantLock(); // 優(yōu)先隊列。 private final PriorityQueue<E> q = new PriorityQueue<E>(); /** * Leader-Follower的變種。 * Thread designated to wait for the element at the head of * the queue. This variant of the Leader-Follower pattern * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to * minimize unnecessary timed waiting. When a thread becomes * the leader, it waits only for the next delay to elapse, but * other threads await indefinitely. The leader thread must * signal some other thread before returning from take() or * poll(...), unless some other thread becomes leader in the * interim. Whenever the head of the queue is replaced with * an element with an earlier expiration time, the leader * field is invalidated by being reset to null, and some * waiting thread, but not necessarily the current leader, is * signalled. So waiting threads must be prepared to acquire * and lose leadership while waiting. */ private Thread leader = null; /** * Condition signalled when a newer element becomes available * at the head of the queue or a new thread may need to * become leader. */ // 條件,代表如果有數(shù)據(jù)則通知Follower線程,喚醒線程處理隊列內(nèi)容。 private final Condition available = lock.newCondition();
Leader-Follower模式的變種,用于最小化不必要的定時等待,當一個線程被選擇為Leader時,它會等待延遲過去執(zhí)行代碼邏輯,而其他線程則需要無限期等待,在從take或poll返回之前,每當隊列的頭部被替換為具有更早到期時間的元素時,leader字段將通過重置為空而無效,Leader線程必須向其中一個Follower線程發(fā)出信號,被喚醒的 follwer 線程被設置為新的Leader 線程。
offer操作
public boolean offer(E e) {
// 獲取到鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 將元素存儲到PriorityQueue優(yōu)先隊列中
q.offer(e);
// 如果第一個元素是當前元素,說明之前隊列中為空,則先將Leader設置為空,通知等待線程可以爭搶Leader了。
if (q.peek() == e) {
leader = null;
available.signal();
}
// 返回成功
return true;
} finally {
lock.unlock();
}
}
offer操作前先進行獲取鎖的操作,也就是同一時間內(nèi)只能有一個線程可以入隊操作。
- 獲取到ReentrantLock鎖對象。
- 將元素添加到PriorityQueue優(yōu)先隊列中
- 如果隊列中最早過期的元素是自己,則說明隊列原先是空的,所以將Leader進行重置,通知Follower線程可以成為Leader線程。
- 最后進行解鎖操作。
put操作
put操作其實就是調(diào)用的offer操作來進行添加數(shù)據(jù)的,以下是源碼信息:
public void put(E e) {
offer(e);
}
take操作
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 獲取可中斷的鎖。
lock.lockInterruptibly();
try {
// 循環(huán)獲取數(shù)據(jù)。
for (;;) {
// 獲取最早過期的元素,但是不彈出對象。
E first = q.peek();
// 如果最早過期的元素為空,說明隊列為空,則線程直接進入無限期等待,并且讓出鎖。
if (first == null)
// 當前線程無限期等待,直到被喚醒,并且讓出鎖對象。
available.await();
else {
// 獲取最早過期的元素剩余過期時間。
long delay = first.getDelay(NANOSECONDS);
// 如果剩余過期時間小于0,則說明已經(jīng)過期,反之還沒有過期。
if (delay <= )
// 如果已經(jīng)過期直接獲取最早過期的元素,并返回。
return q.poll();
// 如果剩余過期日期大于0,則會進入到這里。
// 將剛才獲取的最早過期的元素設置為空。
first = null; // don't retain ref while waiting
// 如果有線程爭搶的Leader線程,則進行無限期等待。
if (leader != null)
// 無限期等待并讓出鎖。
available.await();
else {
// 獲取當前線程。
Thread thisThread = Thread.currentThread();
// 設置當前線程變?yōu)長eader線程。
leader = thisThread;
try {
// 等待剩余等待時間。
available.awaitNanos(delay);
} finally {
// 將Leader設置為null。
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果隊列不為空,并且沒有Leader則通知等待線程可以成為Leader。
if (leader == null && q.peek() != null)
// 通知等待線程。
available.signal();
lock.unlock();
}
}
- 當獲取元素時,先獲取到鎖對象。
- 獲取最早過期的元素,但是并不從隊列中彈出元素。
- 最早過期元素是否為空,如果為空則直接讓當前線程無限期等待狀態(tài),并且讓出當前鎖對象。
- 如果最早過期的元素不為空
- 獲取最早過期元素的剩余過期時間,如果已經(jīng)過期則直接返回當前元素
- 如果沒有過期,也就是說剩余時間還存在,則先獲取Leader對象,如果Leader已經(jīng)有線程在處理,則當前線程進行無限期等待,如果Leader為空,則首先將Leader設置為當前線程,并且讓當前線程等待剩余時間。
- 最后將Leader線程設置為空
- 如果Leader已經(jīng)為空,并且隊列有內(nèi)容則喚醒一個等待的隊列。
poll操作
獲取最早過期的元素,如果隊列頭沒有過期的元素則直接返回null,反之返回過期的元素。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
// 如果隊列為空或者隊列最早過期的元素沒有過期,則返回null。
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
// 出隊列操作。
return q.poll();
} finally {
lock.unlock();
}
}
小結
- DelayQueue是一個無界的并發(fā)延遲阻塞隊列,隊列中的元素必須實現(xiàn)Delayed接口,相應了需要實現(xiàn)Comparable接口實現(xiàn)比較的方法
- Leader-Follower模式的變種,用于最小化不必要的定時等待,當一個線程被選擇為Leader時,它會等待延遲過去執(zhí)行代碼邏輯,而其他線程則需要無限期等待,在從take或poll返回之前,每當隊列的頭部被替換為具有更早到期時間的元素時,leader字段將通過重置為空而無效,Leader線程必須向其中一個Follower線程發(fā)出信號,被喚醒的 follwer 線程被設置為新的Leader 線程。
到此這篇關于java并發(fā)中DelayQueue延遲隊列原理剖析的文章就介紹到這了,更多相關java DelayQueue延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
springboot-2.3.x最新版源碼閱讀環(huán)境搭建(基于gradle構建)
這篇文章主要介紹了springboot-2.3.x最新版源碼閱讀環(huán)境搭建(基于gradle構建),需要的朋友可以參考下2020-08-08
Mybatis Plus Wrapper查詢某幾列的方法實現(xiàn)
MybatisPlus中,使用Wrapper的select和notSelect方法可以精確控制查詢的字段,本文就來介紹一下Mybatis Plus Wrapper查詢某幾列的方法實現(xiàn),感興趣的可以了解一下2024-10-10
JavaWeb開發(fā)之【Tomcat 環(huán)境配置】MyEclipse+IDEA配置教程
這篇文章主要介紹了JavaWeb開發(fā)之【Tomcat 環(huán)境配置】MyEclipse+IDEA配置教程,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-10-10
SpringBoot定制JSON響應數(shù)據(jù)的實現(xiàn)
本文主要介紹了SpringBoot定制JSON響應數(shù)據(jù)的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2025-02-02
Springboot集成minio實現(xiàn)文件存儲的實現(xiàn)代碼
MinIO?是一款基于Go語言的高性能對象存儲服務,本文主要介紹了Springboot集成minio實現(xiàn)文件存儲的實現(xiàn)代碼,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-03-03

