Java多線程之scheduledThreadPool的方法解析
scheduledThreadPool
我們對(duì)java中定時(shí)任務(wù)實(shí)現(xiàn)可能會(huì)有以下疑問(wèn):
怎樣做到每個(gè)任務(wù)延遲指定時(shí)間執(zhí)行?
內(nèi)部使用了什么數(shù)據(jù)結(jié)構(gòu)保存延遲任務(wù)?
延遲任務(wù)放入scheduledThreadPool時(shí)機(jī)并不固定,怎么保證按延遲時(shí)間順序執(zhí)行?
構(gòu)造器
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
corePoolSize就是我們傳過(guò)了的參數(shù),maximumPoolSize是Integer.MAX_VALUE,所以最大線程是無(wú)窮大,非核心線程成活時(shí)間是0,所以非核心線程執(zhí)行完firstTask之后如果poll任務(wù)沒(méi)拿到任務(wù)則會(huì)直接銷(xiāo)毀。queue是DelayedWorkQueue。但通過(guò)后面的分析可以知道,最大線程數(shù)是不起作用的,最多會(huì)起核心線程數(shù)的數(shù)量
schedule(Runnable command,long delay, TimeUnit unit)方法
public ScheduledFuture<?>schedule(Runnable command, long delay, TimeUnit unit){ if(command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t =decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; }
- 通過(guò)decorateTask方法獲取到RunnableScheduledFuture(實(shí)際上是ScheduledFutureTask對(duì)象),并把delay時(shí)間變成了時(shí)間戳
- 執(zhí)行delayedExecute方法
delayedExecute方法
private voiddelayedExecute(RunnableScheduledFuture<?> task){ if(isShutdown()) reject(task); else{ super.getQueue().add(task); if(isShutdown()&& !canRunInCurrentRunState(task.isPeriodic())&& remove(task)) task.cancel(false); else ensurePrestart(); } }
- 使用queue.add方法把task放入queue
- 執(zhí)行ensurePrestart方法
offer方法
public boolean offer(Runnable x){ if(x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e =(RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if(i >= queue.length) grow(); size = i +1; if(i ==0){ queue[0]= e; setIndex(e,0); }else{ siftUp(i, e); } if(queue[0]== e){ leader = null; available.signal(); } } finally { lock.unlock(); } return true; }
- DelayedWorkQueue底層使用的是RunnableScheduledFuture的數(shù)組,初始化容量是16,之后擴(kuò)容是以1.5倍進(jìn)行。
- 在offer元素整個(gè)過(guò)程中使用ReentrantLock進(jìn)行加鎖,所以DelayedWorkQueue是一個(gè)線程安全的隊(duì)列。然后使用了condition來(lái)實(shí)現(xiàn)阻塞的功能,當(dāng)poll沒(méi)有元素時(shí)會(huì)使用await進(jìn)行等待,當(dāng)offer的是數(shù)組的第一個(gè)元素時(shí)會(huì)signal,這個(gè)signal的設(shè)計(jì)是排序的點(diǎn)睛之筆,設(shè)計(jì)的非常巧妙,這塊需要offer和take方法一起來(lái)看,在take方法時(shí)會(huì)拿第一個(gè)元素來(lái)判斷delay的時(shí)間,如果時(shí)間沒(méi)到會(huì)使用await休眠delay時(shí)間,但此時(shí)如果有delay時(shí)間更短的任務(wù)放入queue中,此時(shí)需要take的任務(wù)就不是之前的那個(gè)任務(wù)了,就要重新執(zhí)行邏輯獲取這個(gè)最新delay的任務(wù),這樣才能做到任務(wù)的正確執(zhí)行。
- 在offer元素時(shí)會(huì)使用siftUp方法來(lái)保證數(shù)組中元素是按delay時(shí)間從小到大排列,但要注意的是數(shù)組前半部分肯定都是排了delay最小的任務(wù),但后半部分不一定是有序的
ensurePrestart()方法
voidensurePrestart(){ int wc =workerCountOf(ctl.get()); if(wc < corePoolSize) addWorker(null, true); elseif(wc ==0) addWorker(null, false); }
這個(gè)比較簡(jiǎn)單,addWorker方法之前我們也分析過(guò)了,需要注意的是這里的firstTask默認(rèn)是空的,所以工作線程會(huì)直接從queue中拿任務(wù)。這有個(gè)比較奇怪的else if,感覺(jué)應(yīng)該永遠(yuǎn)不用執(zhí)行,因?yàn)閣c==0肯定已經(jīng)被if條件攔截了,也就是只能起核心線程數(shù)。最大線程數(shù)永遠(yuǎn)不會(huì)起作用
poll方法
public RunnableScheduledFuture<?>poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; 加鎖 lock.lockInterruptibly(); try { 自旋 for(;;){ 拿到queue中的第一個(gè)元素,如果是空則awaitNanos時(shí)間,等待時(shí)間過(guò)后如果queue中還是沒(méi)有元素則返回null。 RunnableScheduledFuture<?> first = queue[0]; if(first == null){ if(nanos <=0) return null; else nanos = available.awaitNanos(nanos); }else{ 拿到第一個(gè)任務(wù)的delay時(shí)間,如果到了delay時(shí)間則返回finishPoll方法的結(jié)果 long delay = first.getDelay(NANOSECONDS); if(delay <=0) returnfinishPoll(first); 如果傳入的nanos小于等于0則返回null if(nanos <=0) return null; first = null;// don't retain ref while waiting 如果等待時(shí)間還不夠或前一個(gè)需要執(zhí)行的任務(wù)還在執(zhí)行,則當(dāng)前線程直接等待 if(nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else{否則當(dāng)前線程可以執(zhí)行(leader線程),但需要awaitNanos delay的時(shí)間才能執(zhí)行 Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { 當(dāng)?shù)却龝r(shí)間到了之后就 leader = null說(shuō)明此時(shí)可以返回finishPoll方法的結(jié)果 if(leader == thisThread) leader = null; } } } } } finally { if(leader == null && queue[0]!= null) available.signal(); lock.unlock(); } }
- DelayedWorkQueue的poll方法也是使用reentrantLock來(lái)保證線程安全,然后使用condition.awaitNanos來(lái)達(dá)到等待特定時(shí)間的效果,這里使用leader線程保證了排在第一位的任務(wù)只有一個(gè)工作線程獲取到,其他工作線程進(jìn)行排隊(duì)等待,在獲取到第一個(gè)任務(wù)的工作線程delay時(shí)間到了之后會(huì)take到這個(gè)任務(wù)并signal排隊(duì)的第一個(gè)工作線程繼續(xù)獲取下一個(gè)任務(wù),周而復(fù)始。
- 在使用finishPoll方法返回delay時(shí)間到了的任務(wù)時(shí)會(huì)用siftDown對(duì)queue后半部分的任務(wù)進(jìn)行排序,因?yàn)橹皁ffer時(shí)使用siftUp方法只對(duì)queue前半部分進(jìn)行了排序
- 回到ScheduledThreadPool線程池,keepAliveTime是0,所以當(dāng)first任務(wù)的delay時(shí)間還沒(méi)有到時(shí)會(huì)直接返回null,然后非核心工作線程就會(huì)直接銷(xiāo)毀,之后的代碼都不會(huì)執(zhí)行,而核心線程則執(zhí)行的take方法,take方法才會(huì)進(jìn)入下面這段邏輯
if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } }
scheduleAtFixedRate方法
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
- 這個(gè)方法跟之前schedule方法差不多了,都是使用了ScheduledFutureTask,這邊多了個(gè)period變量保存執(zhí)行周期值,outerTask引用了自身的對(duì)象,然后也是使用delayExecute方法把任務(wù)放入了queue中,此時(shí)任務(wù)的delay是initialDelay,所以會(huì)在initialDelay時(shí)間之后出隊(duì)然后執(zhí)行
- 由于現(xiàn)在工作線程中的task是ScheduledFutureTask,所以工作線程調(diào)用的task.run方法是ScheduledFutureTask.run方法
ScheduledFutureTask.run方法
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
1.判斷是不是周期執(zhí)行的任務(wù),之前的schedule方法的period是0,所以會(huì)執(zhí)行super.run();然后執(zhí)行傳入的runnable中的run方法,而scheduleAtFixedRate方法的period不是0,則會(huì)執(zhí)行super.runAndReset();方法,執(zhí)行傳入的runnable中的run方法之后執(zhí)行setNextRunTime();
重新設(shè)置delay時(shí)間(initialDelay+period),然后把任務(wù)又放入queue中
scheduleWithFixedDelay方法
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t;
這個(gè)方法幾乎跟scheduleAtFixedRate方法一模一樣,區(qū)別在于period是個(gè)負(fù)數(shù),通過(guò)之前我們對(duì)scheduleAtFixedRate方法的分析,period這個(gè)參數(shù)在算周期執(zhí)行間隔時(shí)會(huì)用到,也就是setNextRunTime方法
setNextRunTime方法
private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }
當(dāng)period大于0時(shí),也就是scheduleAtFixedRate執(zhí)行時(shí),是直接在之前的time加上了period,而scheduleWithFixedDelay方法執(zhí)行時(shí),是用triggerTime方法在當(dāng)前時(shí)間加上了periode,不同的計(jì)算方式的區(qū)別在于,scheduleAtFixedRate不會(huì)管任務(wù)的執(zhí)行時(shí)間,我只要保證任務(wù)固定頻率執(zhí)行就好了,所以他是幾乎精確的period時(shí)間執(zhí)行,而scheduleWithFixedDelay是在任務(wù)之后的時(shí)間+period時(shí)間來(lái)確定下一次任務(wù)執(zhí)行的時(shí)間,所以任務(wù)執(zhí)行的頻率相對(duì)來(lái)說(shuō)不固定
到此這篇關(guān)于Java多線程之scheduledThreadPool的方法解析的文章就介紹到這了,更多相關(guān)scheduledThreadPool的方法內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
深入了解Spring Boot2.3.0及以上版本的Liveness和Readiness功能
這篇文章主要介紹了Spring Boot2.3.0及以上版本的Liveness和Readiness功能示例深入解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10Spring Boot 簡(jiǎn)單使用EhCache緩存框架的方法
本篇文章主要介紹了Spring Boot 簡(jiǎn)單使用EhCache緩存框架的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-07-07MyBatis動(dòng)態(tài)<if>標(biāo)簽使用避坑指南
這篇文章主要為大家介紹了MyBatis動(dòng)態(tài)<if>標(biāo)簽使用避坑指南,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-03-03Springboot實(shí)現(xiàn)多文件上傳代碼解析
這篇文章主要介紹了Springboot實(shí)現(xiàn)多文件上傳代碼解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04MyBatis-Plus實(shí)現(xiàn)公共字段自動(dòng)填充功能詳解
在開(kāi)發(fā)中經(jīng)常遇到多個(gè)實(shí)體類(lèi)有共同的屬性字段,這些字段屬于公共字段,也就是很多表中都有這些字段,能不能對(duì)于這些公共字段在某個(gè)地方統(tǒng)一處理,來(lái)簡(jiǎn)化開(kāi)發(fā)呢?MyBatis-Plus就提供了這一功能,本文就來(lái)為大家詳細(xì)講講2022-08-08Spring內(nèi)置任務(wù)調(diào)度如何實(shí)現(xiàn)添加、取消與重置詳解
任務(wù)調(diào)度是我們?nèi)粘i_(kāi)發(fā)中經(jīng)常會(huì)碰到的,下面這篇文章主要給大家介紹了關(guān)于Spring內(nèi)置任務(wù)調(diào)度如何實(shí)現(xiàn)添加、取消與重置的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-10-10SpringBoot四大神器之Actuator的使用小結(jié)
這篇文章主要介紹了SpringBoot四大神器之Actuator的使用小結(jié),詳細(xì)的介紹了Actuator的使用和端點(diǎn)的使用,有興趣的可以了解一下2017-11-11Java實(shí)現(xiàn)簡(jiǎn)易GUI貪吃蛇小游戲
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)簡(jiǎn)易GUI貪吃蛇小游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-09-09