Java中的DelayQueue源碼解析
介紹
一個實現(xiàn)PriorityBlockingQueue實現(xiàn)延遲獲取的無界隊列,在創(chuàng)建元素時,可以指定多久才能從隊列中獲取當前元素。只有延時期滿后才能從隊列中獲取元素。
DelayQueue可以運用在以下應用場景:
1.緩存系統(tǒng)的設(shè)計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環(huán)查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
2.定時任務(wù)調(diào)度。使用DelayQueue保存當天將會執(zhí)行的任務(wù)和執(zhí)行時間,一旦從DelayQueue中獲取到任務(wù)就開始執(zhí)行,從比如TimerQueue就是使用DelayQueue實現(xiàn)的。
數(shù)據(jù)結(jié)構(gòu)
public interface Delayed extends Comparable<Delayed> { /** * 返回與此對象相關(guān)的剩余延遲時間,以給定的時間單位表示 */ long getDelay(TimeUnit unit); }
getDelay方法一般用內(nèi)部存儲的事件,減去當前事件,即為剩余延遲事件
屬性
private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); /** *用于優(yōu)化內(nèi)部阻塞通知的線程 */ private Thread leader = null; private final Condition available = lock.newCondition();
以支持優(yōu)先級的PriorityQueue無界隊列作為一個容器,因為元素都必須實現(xiàn)Delayed接口,可以根據(jù)元素的過期時間來對元素進行排列,因此,先過期的元素會在隊首,每次從隊列里取出來都是最先要過期的元素。
leader是一個Thread元素,它在offer和take中都有使用,它代表當前獲取到鎖的消費者線程,
DelayQueue實現(xiàn)Leader-Folloer pattern
1、當存在多個take線程時,同時只生效一個,即,leader線程
2、當leader存在時,其它的take線程均為follower,其等待是通過condition實現(xiàn)的
3、當leader不存在時,當前線程即成為leader,在delay之后,將leader角色釋放還原
4、最后如果隊列還有內(nèi)容,且leader空缺,則調(diào)用一次condition的signal,喚醒掛起的take線程,其中之一將成為新的leader
5、最后在finally中釋放鎖
方法實現(xiàn)
offer,poll,peek
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); //如果插入元素是第一個元素 if (q.peek() == e) { //leader設(shè)置為null leader = null; //喚醒 available.signal(); } return true; } finally { lock.unlock(); } } public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); //如果未到期,則返回null,否則刪除 if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); //到期,則poll if (delay <= 0) return q.poll(); if (nanos <= 0) return null; first = null; // don't retain ref while waiting if (nanos < delay || leader != null)//nanos<delay,表示超時剩余時間小于到期時間, nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); //設(shè)置當前線程為leader leader = thisThread; try { //等待條件 long timeLeft = available.awaitNanos(delay); //剩余超時時間 nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.peek(); } finally { lock.unlock(); } }
put,take
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 獲取可中斷鎖。 lock.lockInterruptibly(); try { for (;;) { // 從優(yōu)先級隊列中獲取隊列頭元素 E first = q.peek(); if (first == null) // 無元素,當前線程加入等待隊列,并阻塞 available.await(); else { // 通過getDelay 方法獲取延遲時間 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // 延遲時間到期,獲取并刪除頭部元素。 return q.poll(); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 線程節(jié)點進入等待隊列 x 納秒。 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { // leader == null且還存在元素的話,喚醒一個消費線程。 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } public void put(E e) { offer(e); }
take()方法邏輯:
1.獲取鎖
2.取出優(yōu)先級隊列q的首元素
3.如果元素q的隊首/隊列為空,阻塞
4.如果元素q的隊首(first)不為空,獲得這個元素的delay時間值,如果first的延遲delay時間值為0的話,說明該元素已經(jīng)到了可以使用的時間,調(diào)用poll方法彈出該元素,跳出方法
5.如果first的延遲delay時間值不為0的話,釋放元素first的引用,避免內(nèi)存泄露
6.循環(huán)以上操作,直到return
leader作用
如果leader不為null,說明已經(jīng)有消費者線程拿到鎖,直接阻塞當前線程,如果leader為null,把當前線程賦值給leader,并等待剩余的到期時間,最后釋放leader,這里我們想象著我們有個多個消費者線程用take方法去取,如果沒有l(wèi)eader!=null的判斷,這些線程都會無限循環(huán),直到返回第一個元素,很顯然很浪費資源。所以leader的作用是設(shè)置一個標記,來避免消費者的無腦競爭。
到此這篇關(guān)于Java中的DelayQueue源碼解析的文章就介紹到這了,更多相關(guān)DelayQueue源碼解析內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
JAVA CountDownLatch(倒計時計數(shù)器)用法實例
這篇文章主要介紹了JAVA CountDownLatch(倒計時計數(shù)器)用法實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-10-10