Java中的延遲隊列DelayQueue源碼解析
一、什么是DelayQueue
DelayQueue是一個支持并發(fā)的無界延遲隊列,隊列中的每個元素都有個預定時間,當線程從隊列獲取元素時,只有到期元素才會出隊列,沒有到期元素則阻塞等待。
隊列頭元素是最快要到期的元素。因此DelayQueue可用于實現(xiàn)定時任務隊列。
DelayQueue中的主要成員變量和方法如下:
q:使用優(yōu)先隊列PriorityQueue存儲數(shù)據(jù),隊列中的元素需實現(xiàn)Delayed接口,實現(xiàn)getDelay()和compareTo()方法,以實現(xiàn)優(yōu)先隊列內(nèi)部的優(yōu)先級比較,剩余到期時間越短的元素優(yōu)先級越高
public interface Delayed extends Comparable<Delayed> { //獲取元素剩余到期時間 long getDelay(TimeUnit unit); }
lock:使用ReentrantLock對插入和讀取隊列元素的方法進行加鎖,以實現(xiàn)多線程并發(fā)讀寫隊列操作的同步。
available:用一個條件等待隊列存放等待獲取到期元素的線程。
leader:用于表示當前正在等待獲取隊頭元素的線程,這里使用了一個Leader-Follower模式的變體,線程獲取完元素后從等待隊列中選擇一個線程成為leader繼續(xù)等待獲取隊頭元素,以避免不必要的競爭消耗。
Leader-Follower模式 在并發(fā)IO中,當一個線程收到IO事件后,會考慮啟動一個新的線程去處理,而自己繼續(xù)等待下一個請求。但這里可能會有性能問題,就是把工作交給別一個線程的時候需上下文切換,包括數(shù)據(jù)拷貝。 而在Leader-Follower模式中所有線程會有三種身份中的一種:leader和follower,以及一個干活中的狀態(tài):proccesser。它的基本原則就是,永遠最多只有一個leader。而所有follower都在等待成為leader。線程池啟動時會自動產(chǎn)生一個Leader負責等待事件,當有一個事件產(chǎn)生時,Leader線程首先通知一個Follower線程將其提拔為新的Leader,然后自己去處理這個事件,處理完畢后加入Follower線程等待隊列,等待下次成為Leader。這種方法可以增強CPU高速緩存相似性,及消除動態(tài)內(nèi)存分配和線程間的數(shù)據(jù)交換。
二、主要方法源碼解析
1. offer()
插入元素到隊列。首先獲取鎖,拿到鎖后向優(yōu)先隊列中插入元素,若插入完畢后發(fā)現(xiàn)隊頭元素就是自己,即最近到期時間的元素就是自己,刷新了記錄,那就趕緊從等待隊列中通知一個線程準備來獲取這個元素,然后釋放鎖。
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
這里為什么要將leader先置為null?
因為如果此時leader線程在超時等待獲取前任隊頭元素,而signal通知了另一個線程,看完take()的源碼可以知道如果有l(wèi)eader線程,那么此線程會直接阻塞等待,讓leader線程超時完后獲取隊頭,那顯然時間就不正確了,只有將leader設為null,后續(xù)線程才能成為leader并設置正確的超時時間來等待獲取最新隊頭元素
因此,leader變量的真正含義是:超時等待獲取隊列最新隊頭元素的線程,等待的時間即為最新隊頭元素剩余到期時間 因此,當隊頭元素發(fā)生變動(插入/刪除更新)時,就需要喚醒一個線程更新leader
2. take()
獲取優(yōu)先隊列隊頭元素。首先獲取鎖,拿到鎖后進入一個循環(huán),首先檢測隊頭元素,若為空則進入等待隊列阻塞等待,若不為空且隊頭元素已到期則直接將其出隊返回,如果還沒到期就看有沒有l(wèi)eader線程已經(jīng)在準備獲取隊頭元素了,如果有就不用搶了,進入等待隊列阻塞等待,如果沒有就超時等待準備獲取隊頭元素,被喚醒后進入下一次循環(huán)獲取隊頭元素。獲取完畢后就從等待隊列中通知一個線程到同步隊列準備獲取隊頭元素然后釋放鎖。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { //從優(yōu)先堆中獲取堆頂元素,即優(yōu)先級最高,即預定時間最近的元素 E first = q.peek(); if (first == null) //若隊列中無元素則直接進入條件隊列等待 available.await(); else { long delay = first.getDelay(NANOSECONDS); //若堆頂元素已經(jīng)到期,則直接將其出隊返回 if (delay <= 0) return q.poll(); //等待期間不持有元素引用,防止該元素被其他線程出隊消費后,仍不能被垃圾回收 first = null; // don't retain ref while waiting if (leader != null) //若已經(jīng)有l(wèi)eader了,則進入條件隊列無限期等待 available.await(); else { //否則成為leader進入條件隊列超時等待,到預期時間或者有更近時間元素插入就到同步隊列競爭鎖,再重復循環(huán)去取堆頂元素 Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { //取完元素后若leader為null且隊列中還有元素則從條件等待隊列通知一個線程到同步隊列 //為什么存在leader不為null的情況:leader線程從awaitNanos()中結(jié)束后沒有競爭過新進take()的線程,因此繼續(xù)在同步隊列中被阻塞,因此無需再從條件等待隊列中通知線程,直接讓leader線程再去競爭鎖, if (leader == null && q.peek() != null) available.signal(); lock.unlock();//釋放鎖資源讓同步隊列中的線程競爭鎖 } }
Leader-Follower模式在這里的作用在于,在隊頭元素還沒到期的情況下,只需要有一個線程(leader)超時等待,其余線程進來后發(fā)現(xiàn)已經(jīng)有l(wèi)eader了,就直接無限等待就行了,避免了無意義的超時等待和競爭消耗。
3. poll()
加鎖獲取并移除隊頭過期元素,如果沒有過期元素則不等待直接返回 null。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } }
4. size()
加鎖獲取隊列當前剩余元素個數(shù)
public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.size(); } finally { lock.unlock(); } }
三、使用案例
如下使用案例,首先向DelayQueue插入5個定時任務,然后用3個線程并發(fā)讀取
public class DelayQueueTest { //隊列元素類 static class DelayTask implements Delayed { long exeTime;//預定執(zhí)行時間 public DelayTask(long exeTime) { this.exeTime = exeTime; } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.exeTime - System.currentTimeMillis(), unit); } @Override public int compareTo(Delayed o) { long delta = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); return (int) delta; } } public static void main(String[] args) { DelayQueue<DelayTask> delayQueue = new DelayQueue<>(); for (int i = 1;i <= 5;i++) { delayQueue.offer(new DelayTask(System.currentTimeMillis() + new Random().nextInt(10)*1000)); } for (int i = 1;i <= 3;i++) { new Thread(() -> { try { while (true) { DelayTask task = delayQueue.take(); System.out.printf("取出任務!取出時間:%s 任務預定執(zhí)行時間:%s%n", hms(System.currentTimeMillis()), hms(task.exeTime)); } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } public static String hms(long milliseconds) { return new SimpleDateFormat("HH:mm:ss").format(milliseconds); } }
運行結(jié)果:
取出任務!取出時間:10:27:39 任務預定執(zhí)行時間:10:27:39
取出任務!取出時間:10:27:39 任務預定執(zhí)行時間:10:27:39
取出任務!取出時間:10:27:40 任務預定執(zhí)行時間:10:27:40
取出任務!取出時間:10:27:42 任務預定執(zhí)行時間:10:27:42
取出任務!取出時間:10:27:46 任務預定執(zhí)行時間:10:27:46
到此這篇關于Java中的延遲隊列DelayQueue源碼解析的文章就介紹到這了,更多相關Java延遲隊列DelayQueue內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Spring MVC之WebApplicationContext_動力節(jié)點Java學院整理
這篇文章主要介紹了Spring MVC之WebApplicationContext的相關資料,需要的朋友可以參考下2017-08-08SpringCloud 服務負載均衡和調(diào)用 Ribbon、OpenFeign的方法
這篇文章主要介紹了SpringCloud 服務負載均衡和調(diào)用 Ribbon、OpenFeign的方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-09-09springboot mybatis里localdatetime序列化問題的解決
這篇文章主要介紹了springboot mybatis里localdatetime序列化問題,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-10-10Spring事務管理下synchronized鎖失效問題的解決方法
這篇文章主要給大家介紹了關于Spring事務管理下synchronized鎖失效問題的解決方法,文中通過示例代碼介紹的非常詳細,對大家學習或者使用Spring具有一定的參考學習價值,需要的朋友可以參考下2022-03-03Jpa中Specification的求和sum不生效原理分析
這篇文章主要為大家介紹了Jpa中Specification的求和sum不生效原理示例分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-08-08