java并發(fā)中DelayQueue延遲隊(duì)列原理剖析
介紹
DelayQueue隊(duì)列是一個(gè)延遲隊(duì)列,DelayQueue中存放的元素必須實(shí)現(xiàn)Delayed接口的元素,實(shí)現(xiàn)接口后相當(dāng)于是每個(gè)元素都有個(gè)過期時(shí)間,當(dāng)隊(duì)列進(jìn)行take獲取元素時(shí),先要判斷元素有沒有過期,只有過期的元素才能出隊(duì)操作,沒有過期的隊(duì)列需要等待剩余過期時(shí)間才能進(jìn)行出隊(duì)操作。
源碼分析
DelayQueue隊(duì)列內(nèi)部使用了PriorityQueue優(yōu)先隊(duì)列來進(jìn)行存放數(shù)據(jù),它采用的是二叉堆進(jìn)行的優(yōu)先隊(duì)列,使用ReentrantLock鎖來控制線程同步,由于內(nèi)部元素是采用的PriorityQueue來進(jìn)行存放數(shù)據(jù),所以Delayed接口實(shí)現(xiàn)了Comparable接口,用于比較來控制優(yōu)先級(jí),如下代碼所示:
public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
DelayQueue的成員變量如下所示:
// 鎖。 private final transient ReentrantLock lock = new ReentrantLock(); // 優(yōu)先隊(duì)列。 private final PriorityQueue<E> q = new PriorityQueue<E>(); /** * Leader-Follower的變種。 * Thread designated to wait for the element at the head of * the queue. This variant of the Leader-Follower pattern * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to * minimize unnecessary timed waiting. When a thread becomes * the leader, it waits only for the next delay to elapse, but * other threads await indefinitely. The leader thread must * signal some other thread before returning from take() or * poll(...), unless some other thread becomes leader in the * interim. Whenever the head of the queue is replaced with * an element with an earlier expiration time, the leader * field is invalidated by being reset to null, and some * waiting thread, but not necessarily the current leader, is * signalled. So waiting threads must be prepared to acquire * and lose leadership while waiting. */ private Thread leader = null; /** * Condition signalled when a newer element becomes available * at the head of the queue or a new thread may need to * become leader. */ // 條件,代表如果有數(shù)據(jù)則通知Follower線程,喚醒線程處理隊(duì)列內(nèi)容。 private final Condition available = lock.newCondition();
Leader-Follower模式的變種,用于最小化不必要的定時(shí)等待,當(dāng)一個(gè)線程被選擇為L(zhǎng)eader時(shí),它會(huì)等待延遲過去執(zhí)行代碼邏輯,而其他線程則需要無限期等待,在從take或poll返回之前,每當(dāng)隊(duì)列的頭部被替換為具有更早到期時(shí)間的元素時(shí),leader字段將通過重置為空而無效,Leader線程必須向其中一個(gè)Follower線程發(fā)出信號(hào),被喚醒的 follwer 線程被設(shè)置為新的Leader 線程。
offer操作
public boolean offer(E e) { // 獲取到鎖 final ReentrantLock lock = this.lock; lock.lock(); try { // 將元素存儲(chǔ)到PriorityQueue優(yōu)先隊(duì)列中 q.offer(e); // 如果第一個(gè)元素是當(dāng)前元素,說明之前隊(duì)列中為空,則先將Leader設(shè)置為空,通知等待線程可以爭(zhēng)搶Leader了。 if (q.peek() == e) { leader = null; available.signal(); } // 返回成功 return true; } finally { lock.unlock(); } }
offer操作前先進(jìn)行獲取鎖的操作,也就是同一時(shí)間內(nèi)只能有一個(gè)線程可以入隊(duì)操作。
- 獲取到ReentrantLock鎖對(duì)象。
- 將元素添加到PriorityQueue優(yōu)先隊(duì)列中
- 如果隊(duì)列中最早過期的元素是自己,則說明隊(duì)列原先是空的,所以將Leader進(jìn)行重置,通知Follower線程可以成為L(zhǎng)eader線程。
- 最后進(jìn)行解鎖操作。
put操作
put操作其實(shí)就是調(diào)用的offer操作來進(jìn)行添加數(shù)據(jù)的,以下是源碼信息:
public void put(E e) { offer(e); }
take操作
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 獲取可中斷的鎖。 lock.lockInterruptibly(); try { // 循環(huán)獲取數(shù)據(jù)。 for (;;) { // 獲取最早過期的元素,但是不彈出對(duì)象。 E first = q.peek(); // 如果最早過期的元素為空,說明隊(duì)列為空,則線程直接進(jìn)入無限期等待,并且讓出鎖。 if (first == null) // 當(dāng)前線程無限期等待,直到被喚醒,并且讓出鎖對(duì)象。 available.await(); else { // 獲取最早過期的元素剩余過期時(shí)間。 long delay = first.getDelay(NANOSECONDS); // 如果剩余過期時(shí)間小于0,則說明已經(jīng)過期,反之還沒有過期。 if (delay <= ) // 如果已經(jīng)過期直接獲取最早過期的元素,并返回。 return q.poll(); // 如果剩余過期日期大于0,則會(huì)進(jìn)入到這里。 // 將剛才獲取的最早過期的元素設(shè)置為空。 first = null; // don't retain ref while waiting // 如果有線程爭(zhēng)搶的Leader線程,則進(jìn)行無限期等待。 if (leader != null) // 無限期等待并讓出鎖。 available.await(); else { // 獲取當(dāng)前線程。 Thread thisThread = Thread.currentThread(); // 設(shè)置當(dāng)前線程變?yōu)長(zhǎng)eader線程。 leader = thisThread; try { // 等待剩余等待時(shí)間。 available.awaitNanos(delay); } finally { // 將Leader設(shè)置為null。 if (leader == thisThread) leader = null; } } } } } finally { // 如果隊(duì)列不為空,并且沒有Leader則通知等待線程可以成為L(zhǎng)eader。 if (leader == null && q.peek() != null) // 通知等待線程。 available.signal(); lock.unlock(); } }
- 當(dāng)獲取元素時(shí),先獲取到鎖對(duì)象。
- 獲取最早過期的元素,但是并不從隊(duì)列中彈出元素。
- 最早過期元素是否為空,如果為空則直接讓當(dāng)前線程無限期等待狀態(tài),并且讓出當(dāng)前鎖對(duì)象。
- 如果最早過期的元素不為空
- 獲取最早過期元素的剩余過期時(shí)間,如果已經(jīng)過期則直接返回當(dāng)前元素
- 如果沒有過期,也就是說剩余時(shí)間還存在,則先獲取Leader對(duì)象,如果Leader已經(jīng)有線程在處理,則當(dāng)前線程進(jìn)行無限期等待,如果Leader為空,則首先將Leader設(shè)置為當(dāng)前線程,并且讓當(dāng)前線程等待剩余時(shí)間。
- 最后將Leader線程設(shè)置為空
- 如果Leader已經(jīng)為空,并且隊(duì)列有內(nèi)容則喚醒一個(gè)等待的隊(duì)列。
poll操作
獲取最早過期的元素,如果隊(duì)列頭沒有過期的元素則直接返回null,反之返回過期的元素。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); // 如果隊(duì)列為空或者隊(duì)列最早過期的元素沒有過期,則返回null。 if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else // 出隊(duì)列操作。 return q.poll(); } finally { lock.unlock(); } }
小結(jié)
- DelayQueue是一個(gè)無界的并發(fā)延遲阻塞隊(duì)列,隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口,相應(yīng)了需要實(shí)現(xiàn)Comparable接口實(shí)現(xiàn)比較的方法
- Leader-Follower模式的變種,用于最小化不必要的定時(shí)等待,當(dāng)一個(gè)線程被選擇為L(zhǎng)eader時(shí),它會(huì)等待延遲過去執(zhí)行代碼邏輯,而其他線程則需要無限期等待,在從take或poll返回之前,每當(dāng)隊(duì)列的頭部被替換為具有更早到期時(shí)間的元素時(shí),leader字段將通過重置為空而無效,Leader線程必須向其中一個(gè)Follower線程發(fā)出信號(hào),被喚醒的 follwer 線程被設(shè)置為新的Leader 線程。
到此這篇關(guān)于java并發(fā)中DelayQueue延遲隊(duì)列原理剖析的文章就介紹到這了,更多相關(guān)java DelayQueue延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java高效實(shí)現(xiàn)excel轉(zhuǎn)pdf(支持帶圖片的轉(zhuǎn)換)
這篇文章主要為大家詳細(xì)介紹了如何用java實(shí)現(xiàn)excel轉(zhuǎn)pdf文件,并且支持excel單元格中帶有圖片的轉(zhuǎn)換,文中的示例代碼講解詳細(xì),需要的可以參考下2024-01-01Spring?Cloud詳細(xì)講解zuul集成Eureka流程
這篇文章主要介紹了Spring?Cloud?zuul集成Eureka,Eureka?Client中內(nèi)置一個(gè)負(fù)載均衡器,用來進(jìn)行基本的負(fù)載均衡,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-06-06通過netty把百度地圖API獲取的地理位置從Android端發(fā)送到Java服務(wù)器端的操作方法
這篇文章主要介紹了通過netty把百度地圖API獲取的地理位置從Android端發(fā)送到Java服務(wù)器端,本文通過示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-10-10Hibernate中l(wèi)oad方法與get方法的區(qū)別
Hibernate中有兩個(gè)極為相似的方法get()與load(),他們都可以通過指定的實(shí)體類與ID從數(shù)據(jù)庫(kù)中讀取數(shù)據(jù),并返回對(duì)應(yīng)的實(shí)例,但Hibernate不會(huì)搞兩個(gè)完全一樣的方法的2016-01-01java聯(lián)系人管理系統(tǒng)簡(jiǎn)單設(shè)計(jì)
這篇文章主要為大家詳細(xì)介紹了java聯(lián)系人管理系統(tǒng)簡(jiǎn)單設(shè)計(jì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-10-10SpringBoot整合mybatis常見問題(小結(jié))
這篇文章主要介紹了SpringBoot整合mybatis常見問題(小結(jié)),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12