Java阻塞延遲隊(duì)列DelayQueue原理及使用詳解
前言
從阻塞隊(duì)列說(shuō)起,阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列。
這兩個(gè)附加的操作是:在隊(duì)列為空時(shí),從隊(duì)列中獲取元素的消費(fèi)者線程會(huì)一直等待直到隊(duì)列變?yōu)榉强铡?/p>
當(dāng)隊(duì)列滿時(shí),向隊(duì)列中放置元素的生產(chǎn)者線程會(huì)等待直到隊(duì)列可用。
阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程。
在阻塞隊(duì)列不可用時(shí),這兩個(gè)附加操作提供了4種處理方式:
- 拋出異常:當(dāng)隊(duì)列滿時(shí),插入元素會(huì)拋出IllegalStateException;
- 返回特殊值:offer()是入隊(duì)方法,當(dāng)插入成功時(shí)返回true,插入失敗返回false;poll()是出隊(duì)方法,當(dāng)出隊(duì)成功時(shí)返回元素的值,隊(duì)列為空時(shí)返回null
- 一直阻塞:當(dāng)隊(duì)列滿時(shí),阻塞執(zhí)行插入方法的線程;當(dāng)隊(duì)列空時(shí),阻塞執(zhí)行出隊(duì)方法的線程
- 超時(shí)退出:顧名思義
下面是Java常見(jiàn)的阻塞隊(duì)列。
- ArrayBlockingQueue :一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列
- LinkedBlockingQueue :一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列
- PriorityBlockingQueue :一個(gè)支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列
- DelayQueue:一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無(wú)界阻塞隊(duì)列
- SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列
- LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列
- LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列
DelayQueue解析
DelayQueue隊(duì)列中每個(gè)元素都有一個(gè)過(guò)期時(shí)間,并且隊(duì)列是個(gè)優(yōu)先級(jí)隊(duì)列,當(dāng)從隊(duì)列獲取元素的時(shí)候,只有過(guò)期元素才會(huì)出隊(duì),DelayQueue的類結(jié)構(gòu)如下圖所示:
如圖DelayQueue中內(nèi)部使用的是PriorityQueue存放數(shù)據(jù),使用ReentrantLock實(shí)現(xiàn)線程同步。
另外隊(duì)列里面的元素要實(shí)現(xiàn)Delayed接口,一個(gè)是獲取當(dāng)前剩余時(shí)間的接口,一個(gè)是元素比較的接口,因?yàn)檫@個(gè)是有優(yōu)先級(jí)的隊(duì)列。
DelayQueue類的主要成員
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { // 持有內(nèi)部重入鎖。 private final transient ReentrantLock lock = new ReentrantLock(); // 優(yōu)先級(jí)隊(duì)列,存放工作任務(wù)。 private final PriorityQueue<E> q = new PriorityQueue<E>(); private Thread leader = null; // 依賴于重入鎖的condition。 private final Condition available = lock.newCondition(); }
元素入隊(duì)列
插入元素到隊(duì)列中主要三個(gè)方法,但實(shí)際上底層調(diào)用的都是offer(e)方法
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return {@code true} (as specified by {@link Collection#add}) * @throws NullPointerException if the specified element is null */ public boolean add(E e) { return offer(e); } /** * Inserts the specified element into this delay queue. As the queue is * unbounded this method will never block. * * @param e the element to add * @throws NullPointerException {@inheritDoc} */ public void put(E e) { offer(e); } /** * Inserts the specified element into this delay queue. * * @param e the element to add * @return {@code true} * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; //獲取到重入鎖 lock.lock(); try { q.offer(e); //添加成功元素 if (q.peek() == e) { leader = null; //將等待隊(duì)列中的頭節(jié)點(diǎn)移動(dòng)到同步隊(duì)列。 available.signal(); } return true; } finally { lock.unlock(); } }
首先獲取獨(dú)占鎖,然后添加元素到優(yōu)先級(jí)隊(duì)列,由于q是優(yōu)先級(jí)隊(duì)列,所以添加完元素后,peek()方法返回的并不一定是剛才添加的元素,如果判斷為true,說(shuō)明當(dāng)前元素e的優(yōu)先級(jí)最小也就是即將過(guò)期的,這時(shí)候激活avaliable變量條件隊(duì)列里面的線程,通知它們隊(duì)列里面有元素了。
從隊(duì)列中取元素
有兩個(gè)方法可以取元素(都是取隊(duì)頭),poll()方法取隊(duì)頭當(dāng)隊(duì)頭元素沒(méi)過(guò)期時(shí)返回null,take()方法取隊(duì)頭當(dāng)隊(duì)頭元素沒(méi)過(guò)期時(shí)會(huì)一直等待。
/** * Retrieves and removes the head of this queue, or returns {@code null} * if this queue has no elements with an expired delay. * * @return the head of this queue, or {@code null} if this * queue has no elements with an expired delay */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); //如果隊(duì)列為空,或者不為空但是隊(duì)頭元素沒(méi)有過(guò)期則返回null if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } } /** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { // 獲取鎖。每個(gè)延遲隊(duì)列內(nèi)聚了一個(gè)重入鎖。 final ReentrantLock lock = this.lock; // 獲取可中斷的鎖。 lock.lockInterruptibly(); try { for (;;) { // 嘗試從優(yōu)先級(jí)隊(duì)列中獲取隊(duì)列頭部元素,獲取但不移除 E first = q.peek(); if (first == null) //無(wú)元素,當(dāng)前線程節(jié)點(diǎn)加入等待隊(duì)列,并阻塞當(dāng)前線程 available.await(); else { // 通過(guò)延遲任務(wù)的getDelay()方法獲取延遲時(shí)間 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) //延遲時(shí)間到期,獲取并刪除頭部元素。 return q.poll(); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 線程節(jié)點(diǎn)進(jìn)入等待隊(duì)列 x 納秒。 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { // 若還存在元素的話,則將等待隊(duì)列頭節(jié)點(diǎn)中的線程節(jié)點(diǎn)移動(dòng)到同步隊(duì)列中。 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
重點(diǎn)說(shuō)一下take()方法,第一次調(diào)用take時(shí)候由于隊(duì)列空,所以把當(dāng)前線程放入available的條件隊(duì)列中等待,當(dāng)執(zhí)行offer()成功并且添加的新元素恰好就是優(yōu)先級(jí)隊(duì)列的隊(duì)首時(shí)就會(huì)通知最先等待的線程激活,循環(huán)重新獲取隊(duì)首元素,這時(shí)候first假如不空,則調(diào)用getDelay()方法看該元素還剩下多少時(shí)間就過(guò)期了,如果delay<=0則說(shuō)明已經(jīng)過(guò)期,則直接出隊(duì)返回。
否則看leader是否為null,不為null則說(shuō)明是其他線程也在執(zhí)行take()則把當(dāng)前線程放入條件隊(duì)列,否則就是只有當(dāng)前線程在執(zhí)行take()方法,則當(dāng)前線程await直到剩余過(guò)期時(shí)間到,這期間該線程會(huì)釋放鎖,所以其他線程可以offer()添加元素,也可以take()阻塞自己,剩余過(guò)期時(shí)間到后,當(dāng)前線程會(huì)重新競(jìng)爭(zhēng)鎖,重新進(jìn)入循環(huán)。
如果已經(jīng)具備了JUC包中的Lock接口以及AQS的相關(guān)知識(shí),上述代碼大部分應(yīng)該都比較容易理解。
DelayQueue將實(shí)現(xiàn)了Delayed接口的對(duì)象添加到優(yōu)先級(jí)隊(duì)列中,通過(guò)在可重入鎖的Condition上調(diào)用await()方法,實(shí)現(xiàn)了延遲獲取阻塞隊(duì)列中元素的功能。
總結(jié)
- DelayQueue是一個(gè)內(nèi)部依靠AQS隊(duì)列同步器所實(shí)現(xiàn)的無(wú)界延遲阻塞隊(duì)列。
- 隊(duì)列中的延遲對(duì)象需要覆蓋getDelay()與compareTo()方法,并且要注意 getDelay()的時(shí)間單位的統(tǒng)一,compareTo()根據(jù)業(yè)務(wù)邏輯進(jìn)行合理的比較邏輯重寫(xiě)。
- DelayQueue中內(nèi)聚的可重入鎖是非公平的。
- 延遲隊(duì)列是實(shí)現(xiàn)定時(shí)任務(wù)的關(guān)鍵,ScheduledThreadPoolExecutor中的任務(wù)隊(duì)列是DelayedWorkQueue,其和DelayedQueue高度類似,也是一個(gè)延遲隊(duì)列。
DelayQueue使用例子
寫(xiě)一個(gè)簡(jiǎn)單的例子:
public class DelayQueueTest { public static final int SIZE = 10; public static void main(String[] args) { DelayQueueTest test = new DelayQueueTest(); //初始化線程池 BlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<>(10); ThreadPoolExecutor threadPool = new ThreadPoolExecutor (5, 10, 10, TimeUnit.MILLISECONDS, arrayBlockingQueue, Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); DelayQueue<DelayedTask> delayTaskQueue = new DelayQueue<>(); //模擬SIZE個(gè)延遲任務(wù) for (byte i = 0; i < SIZE; i++) { Long runAt = System.currentTimeMillis() + 1000 * i; String name = "Zhang_" + i; byte age = (byte)(10 + i); String gender = (i % 2 == 0 ? "male" : "female"); Student student = new StudentBuilder(name, age, gender).height(150 + i).province("ZheJiang").build(); delayTaskQueue.put(new DelayedTask<Student>(student, 1, function -> test.print(student), runAt)); } while (true) { if (delayTaskQueue.size() == 0) { break; } try { //從延遲隊(duì)列中取值,如果沒(méi)有對(duì)象過(guò)期則取到null DelayedTask delayedTask = delayTaskQueue.poll(); if (delayedTask != null) { threadPool.execute(delayedTask); } } catch (Exception e) { e.printStackTrace(); } } threadPool.shutdown(); } public String print(Object object) { System.out.println(Thread.currentThread().getName()); String str = ">>>junit log>>>" + object.getClass().getSimpleName() + ":" + object.toString(); System.out.println(str); return str; } private static class DelayedTask<T> implements Delayed, Runnable { /** * 任務(wù)參數(shù) */ private T taskParam; /** * 任務(wù)類型 */ private Integer type; /** * 任務(wù)函數(shù) */ private Function<T, String> function; /** * 任務(wù)執(zhí)行時(shí)刻 */ private Long runAt; public T getTaskParam() { return taskParam; } public Integer getType() { return type; } public Function<T, String> getFunction() { return function; } public Long getRunAt() { return runAt; } DelayedTask(T taskParam, Integer type, Function<T, String> function, Long runAt) { this.taskParam = taskParam; this.type = type; this.function = function; this.runAt = runAt; } @Override public void run() { if (taskParam != null) { function.apply(taskParam); } } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.runAt - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { DelayedTask object = (DelayedTask)o; return this.runAt.compareTo(object.getRunAt()); } } }
運(yùn)行結(jié)果如下,由于10個(gè)元素的延遲時(shí)間均相差1秒,可以看到逐步打印的效果。
DelayQueue典型場(chǎng)景是重試機(jī)制實(shí)現(xiàn),比如當(dāng)調(diào)用接口失敗后,把當(dāng)前調(diào)用信息放入delay=10s的元素,然后把元素放入隊(duì)列,那么這個(gè)隊(duì)列就是一個(gè)重試隊(duì)列,一個(gè)線程通過(guò)take()方法獲取需要重試的接口,take()返回則接口進(jìn)行重試,失敗則再次放入隊(duì)列,同時(shí)也可以在元素加上重試次數(shù)。
到此這篇關(guān)于Java阻塞延遲隊(duì)列DelayQueue原理及使用詳解的文章就介紹到這了,更多相關(guān)阻塞延遲隊(duì)列DelayQueue原理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
kafka并發(fā)寫(xiě)大消息異常TimeoutException排查記錄
這篇文章主要為大家介紹了kafka并發(fā)寫(xiě)大消息異常TimeoutException的排查記錄及解決方案,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-02-02詳解SpringBoot實(shí)現(xiàn)fastdfs防盜鏈功能的示例代碼
我們可以通過(guò)fastdfs實(shí)現(xiàn)一個(gè)分布式文件系統(tǒng),如果fastdfs部署在外網(wǎng),那么任何一個(gè)人知道了上傳接口,就可以實(shí)現(xiàn)文件的上傳和訪問(wèn)。那么如何阻止他人訪問(wèn)我們fastdfs服務(wù)器上的文件呢?此處就需要使用fastdfs的防盜鏈功能,本文就來(lái)講講如何實(shí)現(xiàn)這一功能2022-10-10springmvc學(xué)習(xí)筆記-返回json的日期格式問(wèn)題的解決方法
本篇文章主要介紹了springmvc學(xué)習(xí)筆記-返回json的日期格式問(wèn)題的解決方法,解決了日期格式的輸出,有興趣的可以了解一下。2017-01-01如何使用pipeline和jacoco獲取自動(dòng)化測(cè)試代碼覆蓋率
這篇文章主要介紹了如何使用pipeline和jacoco獲取自動(dòng)化測(cè)試代碼覆蓋率,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11Java實(shí)現(xiàn)word轉(zhuǎn)pdf并在關(guān)鍵字位置插入圖片
這篇文章主要為大家詳細(xì)介紹了如何利用Java實(shí)現(xiàn)word轉(zhuǎn)pdf,并在word中關(guān)鍵字位置插入圖片,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-11-11