Java中的DelayQueue實(shí)現(xiàn)原理及應(yīng)用場(chǎng)景詳解
DelayQueue基本原理
DelayQueue是一個(gè)沒(méi)有邊界BlockingQueue實(shí)現(xiàn),加入其中的元素必需實(shí)現(xiàn)Delayed接口。當(dāng)生產(chǎn)者線程調(diào)用put之類的方法加入元素時(shí),會(huì)觸發(fā)Delayed接口中的compareTo方法進(jìn)行排序,也就是說(shuō)隊(duì)列中元素的順序是按到期時(shí)間排序的,而非它們進(jìn)入隊(duì)列的順序。排在隊(duì)列頭部的元素是最早到期的,越往后到期時(shí)間赿晚。
消費(fèi)者線程查看隊(duì)列頭部的元素,注意是查看不是取出。然后調(diào)用元素的getDelay方法,如果此方法返回的值?。盎蛘叩扔冢埃瑒t消費(fèi)者線程會(huì)從隊(duì)列中取出此元素,并進(jìn)行處理。如果getDelay方法返回的值大于0,則消費(fèi)者線程wait返回的時(shí)間值后,再?gòu)年?duì)列頭部取出元素,此時(shí)元素應(yīng)該已經(jīng)到期。
DelayQueue是Leader-Followr模式的變種,消費(fèi)者線程處于等待狀態(tài)時(shí),總是等待最先到期的元素,而不是長(zhǎng)時(shí)間的等待。消費(fèi)者線程盡量把時(shí)間花在處理任務(wù)上,最小化空等的時(shí)間,以提高線程的利用效率。
以下通過(guò)隊(duì)列及消費(fèi)者線程狀態(tài)變化大致說(shuō)明一下DelayQueue的運(yùn)行過(guò)程。
初始狀態(tài)
因?yàn)殛?duì)列是沒(méi)有邊界的,向隊(duì)列中添加元素的線程不會(huì)阻塞,添加操作相對(duì)簡(jiǎn)單,所以此圖不考慮向隊(duì)列添加元素的生產(chǎn)者線程。假設(shè)現(xiàn)在共有三個(gè)消費(fèi)者線程。
隊(duì)列中的元素按到期時(shí)間排序,隊(duì)列頭部的元素2s以后到期。消費(fèi)者線程1查看了頭部元素以后,發(fā)現(xiàn)還需要2s才到期,于是它進(jìn)入等待狀態(tài),2s以后醒來(lái),等待頭部元素到期的線程稱為L(zhǎng)eader線程。
消費(fèi)者線程2與消費(fèi)者線程3處于待命狀態(tài),它們不等待隊(duì)列中的非頭部元素。當(dāng)消費(fèi)者線程1拿到對(duì)象5以后,會(huì)向它們發(fā)送signal。這個(gè)時(shí)候兩個(gè)中的一個(gè)會(huì)結(jié)束待命狀態(tài)而進(jìn)入等待狀態(tài)。
2S以后
消費(fèi)者線程1已經(jīng)拿到了對(duì)象5,從等待狀態(tài)進(jìn)入處理狀態(tài),處理它取到的對(duì)象5,同時(shí)向消費(fèi)者線程2與消費(fèi)者線程3發(fā)送signal。
消費(fèi)者線程2與消費(fèi)者線程3會(huì)爭(zhēng)搶領(lǐng)導(dǎo)權(quán),這里是消費(fèi)者線程2進(jìn)入等待狀態(tài),成為L(zhǎng)eader線程,等待2s以后對(duì)象4到期。而消費(fèi)者線程3則繼續(xù)處于待命狀態(tài)。
此時(shí)隊(duì)列中加入了一個(gè)新元素對(duì)象6,它10s后到期,排在隊(duì)尾。
又2S以后
先看線程1,如果它已經(jīng)結(jié)束了對(duì)象5的處理,則進(jìn)入待命狀態(tài)。如果還沒(méi)有結(jié)束,則它繼續(xù)處理對(duì)象5。
消費(fèi)線程2取到對(duì)象4以后,也進(jìn)入處理狀態(tài),同時(shí)給處于待命狀態(tài)的消費(fèi)線程3發(fā)送信號(hào),消費(fèi)線程3進(jìn)入等待狀態(tài),成為新的Leader?,F(xiàn)在頭部元素是新插入的對(duì)象7,因?yàn)樗?s以后就過(guò)期,要早于其它所有元素,所以排到了隊(duì)列頭部。
又1S后
一種不好的結(jié)果:
消費(fèi)線程3一定正在處理對(duì)象7。消費(fèi)線程1與消費(fèi)線程2還沒(méi)有處理完它們各自取得的對(duì)象,無(wú)法進(jìn)入待命狀態(tài),也更加進(jìn)入不了等待狀態(tài)。此時(shí)對(duì)象3馬上要到期,那么如果它到期時(shí)沒(méi)有消費(fèi)者線程空下來(lái),則它的處理一定會(huì)延期。
可以想見(jiàn),如果元素進(jìn)入隊(duì)列的速度很快,元素之間的到期時(shí)間相對(duì)集中,而處理每個(gè)到期元素的速度又比較慢的話,則隊(duì)列會(huì)越來(lái)越大,隊(duì)列后邊的元素延期處理的時(shí)間會(huì)越來(lái)越長(zhǎng)。
另外一種好的結(jié)果:
消費(fèi)線程1與消費(fèi)線程2很快的完成對(duì)取出對(duì)象的處理,及時(shí)返回重新等待隊(duì)列中的到期元素。一個(gè)處于等待狀態(tài)(Leader),對(duì)象3一到期就立刻處理。另一個(gè)則處于待命狀態(tài)。
這樣,每一個(gè)對(duì)象都能在到期時(shí)被及時(shí)處理,不會(huì)發(fā)生明顯的延期。
所以,消費(fèi)者線程的數(shù)量要夠,處理任務(wù)的速度要快。否則,隊(duì)列中的到期元素?zé)o法被及時(shí)取出并處理,造成任務(wù)延期、隊(duì)列元素堆積等情況。
示例代碼
DelayQueue的一個(gè)應(yīng)用場(chǎng)景是定時(shí)任務(wù)調(diào)度。本例中先讓主線程向DelayQueue添加10個(gè)任務(wù),任務(wù)之間的啟動(dòng)間隔在1~2s之間,每個(gè)任務(wù)的執(zhí)行時(shí)間固定為2s,代碼如下:
package com.zhangdb.thread; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; class DelayTask implements Delayed { private static long currentTime = System.currentTimeMillis(); protected final String taskName; protected final int timeCost; protected final long scheduleTime; protected static final AtomicInteger taskCount = new AtomicInteger(0); // 定時(shí)任務(wù)之間的啟動(dòng)時(shí)間間隔在1~2s之間,timeCost表示處理此任務(wù)需要的時(shí)間,本示例中為2s public DelayTask(String taskName, int timeCost) { this.taskName = taskName; this.timeCost = timeCost; taskCount.incrementAndGet(); currentTime += 1000 + (long) (Math.random() * 1000); scheduleTime = currentTime; } @Override public int compareTo(Delayed o) { return (int) (this.scheduleTime - ((DelayTask) o).scheduleTime); } @Override public long getDelay(TimeUnit unit) { long expirationTime = scheduleTime - System.currentTimeMillis(); return unit.convert(expirationTime, TimeUnit.MILLISECONDS); } public void execTask() { long startTime = System.currentTimeMillis(); System.out.println("Task " + taskName + ": schedule_start_time=" + scheduleTime + ",real start time=" + startTime + ",delay=" + (startTime - scheduleTime)); try { Thread.sleep(timeCost); } catch (InterruptedException e) { e.printStackTrace(); } } } class DelayTaskComsumer extends Thread { private final BlockingQueue<DelayTask> queue; public DelayTaskComsumer(BlockingQueue<DelayTask> queue) { this.queue = queue; } @Override public void run() { DelayTask task = null; try { while (true) { task = queue.take(); task.execTask(); DelayTask.taskCount.decrementAndGet(); } } catch (InterruptedException e) { System.out.println(getName() + " finished"); } } } public class DelayQueueExample { public static void main(String[] args) { BlockingQueue<DelayTask> queue = new DelayQueue<DelayTask>(); for (int i = 0; i < 10; i++) { try { queue.put(new DelayTask("work " + i, 2000)); } catch (InterruptedException e) { e.printStackTrace(); } } ThreadGroup g = new ThreadGroup("Consumers"); for (int i = 0; i < 1; i++) { new Thread(g, new DelayTaskComsumer(queue)).start(); } while (DelayTask.taskCount.get() > 0) { try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } g.interrupt(); System.out.println("Main thread finished"); } }
首先啟動(dòng)一個(gè)消費(fèi)者線程。因?yàn)橄M(fèi)者線程處單個(gè)任務(wù)的時(shí)間為2s,而任務(wù)的調(diào)度間隔為1~2s。這種情況下,每當(dāng)消費(fèi)者線程處理完一個(gè)任務(wù),回頭再?gòu)年?duì)列中新取任務(wù)時(shí),新任務(wù)肯定延期了,無(wú)法按給定的時(shí)間調(diào)度任務(wù)。而且越往后情況越嚴(yán)重。運(yùn)行代碼看一下輸出:
Task work 0: schedule_start_time=1554203579096,real start time=1554203579100,delay=4
Task work 1: schedule_start_time=1554203580931,real start time=1554203581101,delay=170
Task work 2: schedule_start_time=1554203582884,real start time=1554203583101,delay=217
Task work 3: schedule_start_time=1554203584660,real start time=1554203585101,delay=441
Task work 4: schedule_start_time=1554203586075,real start time=1554203587101,delay=1026
Task work 5: schedule_start_time=1554203587956,real start time=1554203589102,delay=1146
Task work 6: schedule_start_time=1554203589041,real start time=1554203591102,delay=2061
Task work 7: schedule_start_time=1554203590127,real start time=1554203593102,delay=2975
Task work 8: schedule_start_time=1554203591903,real start time=1554203595102,delay=3199
Task work 9: schedule_start_time=1554203593577,real start time=1554203597102,delay=3525
Main thread finished
Thread-0 finished
最后一個(gè)任務(wù)的延遲時(shí)間已經(jīng)超過(guò)3.5s了。
再作一次測(cè)試,將消費(fèi)者線程的個(gè)數(shù)調(diào)整為2,這時(shí)任務(wù)應(yīng)該能按時(shí)啟動(dòng),延遲應(yīng)該很小,運(yùn)行程序看一下結(jié)果:
Task work 0: schedule_start_time=1554204395427,real start time=1554204395430,delay=3
Task work 1: schedule_start_time=1554204396849,real start time=1554204396850,delay=1
Task work 2: schedule_start_time=1554204398050,real start time=1554204398051,delay=1
Task work 3: schedule_start_time=1554204399590,real start time=1554204399590,delay=0
Task work 4: schedule_start_time=1554204401289,real start time=1554204401289,delay=0
Task work 5: schedule_start_time=1554204402883,real start time=1554204402883,delay=0
Task work 6: schedule_start_time=1554204404663,real start time=1554204404664,delay=1
Task work 7: schedule_start_time=1554204406154,real start time=1554204406154,delay=0
Task work 8: schedule_start_time=1554204407991,real start time=1554204407991,delay=0
Task work 9: schedule_start_time=1554204409540,real start time=1554204409540,delay=0
Main thread finished
Thread-0 finished
Thread-2 finished
基本上按時(shí)啟動(dòng),最大延遲為3毫秒,大部分都是0毫秒。
將消費(fèi)者線程個(gè)數(shù)調(diào)整成3個(gè),運(yùn)行看一下結(jié)果:
Task work 0: schedule_start_time=1554204499695,real start time=1554204499698,delay=3
Task work 1: schedule_start_time=1554204501375,real start time=1554204501376,delay=1
Task work 2: schedule_start_time=1554204503370,real start time=1554204503371,delay=1
Task work 3: schedule_start_time=1554204504860,real start time=1554204504861,delay=1
Task work 4: schedule_start_time=1554204506419,real start time=1554204506420,delay=1
Task work 5: schedule_start_time=1554204508191,real start time=1554204508192,delay=1
Task work 6: schedule_start_time=1554204509495,real start time=1554204509496,delay=1
Task work 7: schedule_start_time=1554204510663,real start time=1554204510664,delay=1
Task work 8: schedule_start_time=1554204512598,real start time=1554204512598,delay=0
Task work 9: schedule_start_time=1554204514276,real start time=1554204514277,delay=1
Main thread finished
Thread-0 finished
Thread-2 finished
Thread-4 finished
大部分延遲時(shí)間變成1毫秒,情況好像還不如2個(gè)線程的情況。
將消費(fèi)者線程數(shù)調(diào)整成5,運(yùn)行看一下結(jié)果:
Task work 0: schedule_start_time=1554204635015,real start time=1554204635019,delay=4
Task work 1: schedule_start_time=1554204636856,real start time=1554204636857,delay=1
Task work 2: schedule_start_time=1554204637968,real start time=1554204637970,delay=2
Task work 3: schedule_start_time=1554204639758,real start time=1554204639759,delay=1
Task work 4: schedule_start_time=1554204641089,real start time=1554204641090,delay=1
Task work 5: schedule_start_time=1554204642879,real start time=1554204642880,delay=1
Task work 6: schedule_start_time=1554204643941,real start time=1554204643942,delay=1
Task work 7: schedule_start_time=1554204645006,real start time=1554204645007,delay=1
Task work 8: schedule_start_time=1554204646309,real start time=1554204646310,delay=1
Task work 9: schedule_start_time=1554204647537,real start time=1554204647538,delay=1
Thread-2 finished
Thread-0 finished
Main thread finished
Thread-8 finished
Thread-4 finished
Thread-6 finished
與3個(gè)消費(fèi)者線程的情況差不多。
結(jié)論
最優(yōu)的消費(fèi)者線程的個(gè)數(shù)與任務(wù)啟動(dòng)的時(shí)間間隔好像存在這樣的關(guān)系:?jiǎn)蝹€(gè)任務(wù)處理時(shí)間的最大值 / 相鄰任務(wù)的啟動(dòng)時(shí)間最小間隔?。健∽顑?yōu)線程數(shù),如果最優(yōu)線程數(shù)是小數(shù),則取整數(shù)后加1,比如1.3的話,那么最優(yōu)線程數(shù)應(yīng)該是2。
本例中,單個(gè)任務(wù)處理時(shí)間的最大值固定為2s。 相鄰任務(wù)的啟動(dòng)時(shí)間最小間隔為1s。 則消費(fèi)者線程數(shù)為2/1=2。
如果消費(fèi)者線程數(shù)小于此值,則來(lái)不及處理到期的任務(wù)。如果大于此值,線程太多,在調(diào)度、同步上花更多的時(shí)間,無(wú)益改善性能。
到此這篇關(guān)于Java中的DelayQueue實(shí)現(xiàn)原理及應(yīng)用場(chǎng)景詳解的文章就介紹到這了,更多相關(guān)DelayQueue實(shí)現(xiàn)原理及應(yīng)用場(chǎng)景內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring?AOP操作的相關(guān)術(shù)語(yǔ)及環(huán)境準(zhǔn)備
這篇文章主要為大家介紹了Spring?AOP操作的相關(guān)術(shù)語(yǔ)及環(huán)境準(zhǔn)備學(xué)習(xí),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05細(xì)談java同步之JMM(Java Memory Model)
Java內(nèi)存模型是在硬件內(nèi)存模型上的更高層的抽象,它屏蔽了各種硬件和操作系統(tǒng)訪問(wèn)的差異性,保證了Java程序在各種平臺(tái)下對(duì)內(nèi)存的訪問(wèn)都能達(dá)到一致的效果。下面我們來(lái)一起學(xué)習(xí)下JMM2019-05-05Log4j關(guān)閉Spring和Hibernate日志打印方式
這篇文章主要介紹了Log4j關(guān)閉Spring和Hibernate日志打印方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12Java Scala偏函數(shù)與偏應(yīng)用函數(shù)超詳細(xì)講解
Scala是一種多范式的編程語(yǔ)言,支持面向?qū)ο蠛秃瘮?shù)式編程。Scala也支持異常處理,即在程序運(yùn)行過(guò)程中發(fā)生意外或錯(cuò)誤時(shí),采取相應(yīng)的措施2023-04-04Java中的throws關(guān)鍵字處理異常的最佳實(shí)踐記錄
在Java編程中,異常處理是保證程序健壯性和穩(wěn)定性的重要手段,除了使用try-catch塊捕獲異常外,Java還提供了throws關(guān)鍵字,允許我們將異常拋給調(diào)用者處理,本文介紹Java中的throws關(guān)鍵字處理異常的最佳實(shí)踐記錄,感興趣的朋友一起看看吧2025-01-01Java Swing組件單選框JRadioButton用法示例
這篇文章主要介紹了Java Swing組件單選框JRadioButton用法,結(jié)合具體實(shí)例形式分析了Swing單選框JRadioButton的使用方法及相關(guān)操作注意事項(xiàng),需要的朋友可以參考下2017-11-11解決安裝mysqlclient的時(shí)候出現(xiàn)Microsoft Visual C++ 14.0 is required報(bào)錯(cuò)
這篇文章主要介紹了解決安裝mysqlclient的時(shí)候出現(xiàn)Microsoft Visual C++ 14.0 is required報(bào)錯(cuò)問(wèn)題,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2018-11-11Java統(tǒng)計(jì)字符串中字符出現(xiàn)次數(shù)的方法示例
這篇文章主要介紹了Java統(tǒng)計(jì)字符串中字符出現(xiàn)次數(shù)的方法,涉及Java針對(duì)字符串的遍歷、查找、計(jì)算等相關(guān)操作技巧,需要的朋友可以參考下2017-12-12