java高并發(fā)ScheduledThreadPoolExecutor類深度解析
正文
在【高并發(fā)專題】的專欄中,我們深度分析了ThreadPoolExecutor類的源代碼,而ScheduledThreadPoolExecutor類是ThreadPoolExecutor類的子類。今天我們就來一起手撕ScheduledThreadPoolExecutor類的源代碼。
構(gòu)造方法
我們先來看下ScheduledThreadPoolExecutor的構(gòu)造方法,源代碼如下所示。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
?
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
?
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
?
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
從代碼結(jié)構(gòu)上來看,ScheduledThreadPoolExecutor類是ThreadPoolExecutor類的子類,ScheduledThreadPoolExecutor類的構(gòu)造方法實(shí)際上調(diào)用的是ThreadPoolExecutor類的構(gòu)造方法。
schedule方法
接下來,我們看一下ScheduledThreadPoolExecutor類的schedule方法,源代碼如下所示。
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
//如果傳遞的Runnable對象和TimeUnit時(shí)間單位為空
//拋出空指針異常
if (command == null || unit == null)
throw new NullPointerException();
//封裝任務(wù)對象,在decorateTask方法中直接返回ScheduledFutureTask對象
RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
//執(zhí)行延時(shí)任務(wù)
delayedExecute(t);
//返回任務(wù)
return t;
}
?
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
//如果傳遞的Callable對象和TimeUnit時(shí)間單位為空
//拋出空指針異常
if (callable == null || unit == null)
throw new NullPointerException();
//封裝任務(wù)對象,在decorateTask方法中直接返回ScheduledFutureTask對象
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));
//執(zhí)行延時(shí)任務(wù)
delayedExecute(t);
//返回任務(wù)
return t;
}
從源代碼可以看出,ScheduledThreadPoolExecutor類提供了兩個重載的schedule方法,兩個schedule方法的第一個參數(shù)不同。可以傳遞Runnable接口對象,也可以傳遞Callable接口對象。在方法內(nèi)部,會將Runnable接口對象和Callable接口對象封裝成RunnableScheduledFuture對象,本質(zhì)上就是封裝成ScheduledFutureTask對象。并通過delayedExecute方法來執(zhí)行延時(shí)任務(wù)。
在源代碼中,我們看到兩個schedule都調(diào)用了decorateTask方法,接下來,我們就看看decorateTask方法。
decorateTask方法
decorateTask方法源代碼如下所示。
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
通過源碼可以看出decorateTask方法的實(shí)現(xiàn)比較簡單,接收一個Runnable接口對象或者Callable接口對象和封裝的RunnableScheduledFuture任務(wù),兩個方法都是將RunnableScheduledFuture任務(wù)直接返回。在ScheduledThreadPoolExecutor類的子類中可以重寫這兩個方法。
接下來,我們繼續(xù)看下scheduleAtFixedRate方法。
scheduleAtFixedRate方法
scheduleAtFixedRate方法源代碼如下所示。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
//傳入的Runnable對象和TimeUnit為空,則拋出空指針異常
if (command == null || unit == null)
throw new NullPointerException();
//如果執(zhí)行周期period傳入的數(shù)值小于或者等于0
//拋出非法參數(shù)異常
if (period <= 0)
throw new IllegalArgumentException();
//將Runnable對象封裝成ScheduledFutureTask任務(wù),
//并設(shè)置執(zhí)行周期
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period));
//調(diào)用decorateTask方法,本質(zhì)上還是直接返回ScheduledFutureTask對象
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
//設(shè)置執(zhí)行的任務(wù)
sft.outerTask = t;
//執(zhí)行延時(shí)任務(wù)
delayedExecute(t);
//返回執(zhí)行的任務(wù)
return t;
}
通過源碼可以看出,scheduleAtFixedRate方法將傳遞的Runnable對象封裝成ScheduledFutureTask任務(wù)對象,并設(shè)置了執(zhí)行周期,下一次的執(zhí)行時(shí)間相對于上一次的執(zhí)行時(shí)間來說,加上了period時(shí)長,時(shí)長的具體單位由TimeUnit決定。采用固定的頻率來執(zhí)行定時(shí)任務(wù)。
ScheduledThreadPoolExecutor類中另一個定時(shí)調(diào)度任務(wù)的方法是scheduleWithFixedDelay方法,接下來,我們就一起看看scheduleWithFixedDelay方法。
scheduleWithFixedDelay方法
scheduleWithFixedDelay方法的源代碼如下所示。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
//傳入的Runnable對象和TimeUnit為空,則拋出空指針異常
if (command == null || unit == null)
throw new NullPointerException();
//任務(wù)延時(shí)時(shí)長小于或者等于0,則拋出非法參數(shù)異常
if (delay <= 0)
throw new IllegalArgumentException();
//將Runnable對象封裝成ScheduledFutureTask任務(wù)
//并設(shè)置固定的執(zhí)行周期來執(zhí)行任務(wù)
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command, null,triggerTime(initialDelay, unit), unit.toNanos(-delay));
//調(diào)用decorateTask方法,本質(zhì)上直接返回ScheduledFutureTask任務(wù)
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
//設(shè)置執(zhí)行的任務(wù)
sft.outerTask = t;
//執(zhí)行延時(shí)任務(wù)
delayedExecute(t);
//返回任務(wù)
return t;
}
從scheduleWithFixedDelay方法的源代碼,我們可以看出在將Runnable對象封裝成ScheduledFutureTask時(shí),設(shè)置了執(zhí)行周期,但是此時(shí)設(shè)置的執(zhí)行周期與scheduleAtFixedRate方法設(shè)置的執(zhí)行周期不同。此時(shí)設(shè)置的執(zhí)行周期規(guī)則為:下一次任務(wù)執(zhí)行的時(shí)間是上一次任務(wù)完成的時(shí)間加上delay時(shí)長,時(shí)長單位由TimeUnit決定。也就是說,具體的執(zhí)行時(shí)間不是固定的,但是執(zhí)行的周期是固定的,整體采用的是相對固定的延遲來執(zhí)行定時(shí)任務(wù)。
如果大家細(xì)心的話,會發(fā)現(xiàn)在scheduleWithFixedDelay方法中設(shè)置執(zhí)行周期時(shí),傳遞的delay值為負(fù)數(shù),如下所示。
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay));
這里的負(fù)數(shù)表示的是相對固定的延遲。
在ScheduledFutureTask類中,存在一個setNextRunTime方法,這個方法會在run方法執(zhí)行完任務(wù)后調(diào)用,這個方法更能體現(xiàn)scheduleAtFixedRate方法和scheduleWithFixedDelay方法的不同,setNextRunTime方法的源碼如下所示。
private void setNextRunTime() {
//距離下次執(zhí)行任務(wù)的時(shí)長
long p = period;
//固定頻率執(zhí)行,
//上次執(zhí)行任務(wù)的時(shí)間
//加上任務(wù)的執(zhí)行周期
if (p > 0)
time += p;
//相對固定的延遲
//使用的是系統(tǒng)當(dāng)前時(shí)間
//加上任務(wù)的執(zhí)行周期
else
time = triggerTime(-p);
}
在setNextRunTime方法中通過對下次執(zhí)行任務(wù)的時(shí)長進(jìn)行判斷來確定是固定頻率執(zhí)行還是相對固定的延遲。
triggerTime方法
在ScheduledThreadPoolExecutor類中提供了兩個triggerTime方法,用于獲取下一次執(zhí)行任務(wù)的具體時(shí)間。triggerTime方法的源碼如下所示。
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
?
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
這兩個triggerTime方法的代碼比較簡單,就是獲取下一次執(zhí)行任務(wù)的具體時(shí)間。有一點(diǎn)需要注意的是:delay < (Long.MAX_VALUE >> 1判斷delay的值是否小于Long.MAX_VALUE的一半,如果小于Long.MAX_VALUE值的一半,則直接返回delay,否則需要處理溢出的情況。
我們看到在triggerTime方法中處理防止溢出的邏輯使用了overflowFree方法,接下來,我們就看看overflowFree方法的實(shí)現(xiàn)。
overflowFree方法
overflowFree方法的源代碼如下所示。
private long overflowFree(long delay) {
//獲取隊(duì)列中的節(jié)點(diǎn)
Delayed head = (Delayed) super.getQueue().peek();
//獲取的節(jié)點(diǎn)不為空,則進(jìn)行后續(xù)處理
if (head != null) {
//從隊(duì)列節(jié)點(diǎn)中獲取延遲時(shí)間
long headDelay = head.getDelay(NANOSECONDS);
//如果從隊(duì)列中獲取的延遲時(shí)間小于0,并且傳遞的delay
//值減去從隊(duì)列節(jié)點(diǎn)中獲取延遲時(shí)間小于0
if (headDelay < 0 && (delay - headDelay < 0))
//將delay的值設(shè)置為Long.MAX_VALUE + headDelay
delay = Long.MAX_VALUE + headDelay;
}
//返回延遲時(shí)間
return delay;
}
通過對overflowFree方法的源碼分析,可以看出overflowFree方法本質(zhì)上就是為了限制隊(duì)列中的所有節(jié)點(diǎn)的延遲時(shí)間在Long.MAX_VALUE值之內(nèi),防止在ScheduledFutureTask類中的compareTo方法中溢出。
ScheduledFutureTask類中的compareTo方法的源碼如下所示。
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
compareTo方法的主要作用就是對各延遲任務(wù)進(jìn)行排序,距離下次執(zhí)行時(shí)間靠前的任務(wù)就排在前面。
delayedExecute方法
delayedExecute方法是ScheduledThreadPoolExecutor類中延遲執(zhí)行任務(wù)的方法,源代碼如下所示。
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果當(dāng)前線程池已經(jīng)關(guān)閉
//則執(zhí)行線程池的拒絕策略
if (isShutdown())
reject(task);
//線程池沒有關(guān)閉
else {
//將任務(wù)添加到阻塞隊(duì)列中
super.getQueue().add(task);
//如果當(dāng)前線程池是SHUTDOWN狀態(tài)
//并且當(dāng)前線程池狀態(tài)下不能執(zhí)行任務(wù)
//并且成功從阻塞隊(duì)列中移除任務(wù)
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
//取消任務(wù)的執(zhí)行,但不會中斷執(zhí)行中的任務(wù)
task.cancel(false);
else
//調(diào)用ThreadPoolExecutor類中的ensurePrestart()方法
ensurePrestart();
}
}
可以看到在delayedExecute方法內(nèi)部調(diào)用了canRunInCurrentRunState方法,canRunInCurrentRunState方法的源碼實(shí)現(xiàn)如下所示。
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown);
}
可以看到canRunInCurrentRunState方法的邏輯比較簡單,就是判斷線程池當(dāng)前狀態(tài)下能夠執(zhí)行任務(wù)。
另外,在delayedExecute方法內(nèi)部還調(diào)用了ThreadPoolExecutor類中的ensurePrestart()方法,接下來,我們看下ThreadPoolExecutor類中的ensurePrestart()方法的實(shí)現(xiàn),如下所示。
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
在ThreadPoolExecutor類中的ensurePrestart()方法中,首先獲取當(dāng)前線程池中線程的數(shù)量,如果線程數(shù)量小于corePoolSize則調(diào)用addWorker方法傳遞null和true,如果線程數(shù)量為0,則調(diào)用addWorker方法傳遞null和false。
關(guān)于addWork()方法的源碼解析,大家可以參考【高并發(fā)專題】中的《高并發(fā)之——通過ThreadPoolExecutor類的源碼深度解析線程池執(zhí)行任務(wù)的核心流程》一文,這里,不再贅述。
reExecutePeriodic方法
reExecutePeriodic方法的源代碼如下所示。
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
//線程池當(dāng)前狀態(tài)下能夠執(zhí)行任務(wù)
if (canRunInCurrentRunState(true)) {
//將任務(wù)放入隊(duì)列
super.getQueue().add(task);
//線程池當(dāng)前狀態(tài)下不能執(zhí)行任務(wù),并且成功移除任務(wù)
if (!canRunInCurrentRunState(true) && remove(task))
//取消任務(wù)
task.cancel(false);
else
//調(diào)用ThreadPoolExecutor類的ensurePrestart()方法
ensurePrestart();
}
}
總體來說reExecutePeriodic方法的邏輯比較簡單,但是,這里需要注意和delayedExecute方法的不同點(diǎn):調(diào)用reExecutePeriodic方法的時(shí)候已經(jīng)執(zhí)行過一次任務(wù),所以,并不會觸發(fā)線程池的拒絕策略;傳入reExecutePeriodic方法的任務(wù)一定是周期性的任務(wù)。
onShutdown方法
onShutdown方法是ThreadPoolExecutor類中的鉤子函數(shù),它是在ThreadPoolExecutor類中的shutdown方法中調(diào)用的,而在ThreadPoolExecutor類中的onShutdown方法是一個空方法,如下所示。
void onShutdown() {
}
ThreadPoolExecutor類中的onShutdown方法交由子類實(shí)現(xiàn),所以ScheduledThreadPoolExecutor類覆寫了onShutdown方法,實(shí)現(xiàn)了具體的邏輯,ScheduledThreadPoolExecutor類中的onShutdown方法的源碼實(shí)現(xiàn)如下所示。
@Override
void onShutdown() {
//獲取隊(duì)列
BlockingQueue<Runnable> q = super.getQueue();
//在線程池已經(jīng)調(diào)用shutdown方法后,是否繼續(xù)執(zhí)行現(xiàn)有延遲任務(wù)
boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
//在線程池已經(jīng)調(diào)用shutdown方法后,是否繼續(xù)執(zhí)行現(xiàn)有定時(shí)任務(wù)
boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
//在線程池已經(jīng)調(diào)用shutdown方法后,不繼續(xù)執(zhí)行現(xiàn)有延遲任務(wù)和定時(shí)任務(wù)
if (!keepDelayed && !keepPeriodic) {
//遍歷隊(duì)列中的所有任務(wù)
for (Object e : q.toArray())
//取消任務(wù)的執(zhí)行
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
//清空隊(duì)列
q.clear();
}
//在線程池已經(jīng)調(diào)用shutdown方法后,繼續(xù)執(zhí)行現(xiàn)有延遲任務(wù)和定時(shí)任務(wù)
else {
//遍歷隊(duì)列中的所有任務(wù)
for (Object e : q.toArray()) {
//當(dāng)前任務(wù)是RunnableScheduledFuture類型
if (e instanceof RunnableScheduledFuture) {
//將任務(wù)強(qiáng)轉(zhuǎn)為RunnableScheduledFuture類型
RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
//在線程池調(diào)用shutdown方法后不繼續(xù)的延遲任務(wù)或周期任務(wù)
//則從隊(duì)列中刪除并取消任務(wù)
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) {
if (q.remove(t))
t.cancel(false);
}
}
}
}
//最終調(diào)用tryTerminate()方法
tryTerminate();
}
ScheduledThreadPoolExecutor類中的onShutdown方法的主要邏輯就是先判斷線程池調(diào)用shutdown方法后,是否繼續(xù)執(zhí)行現(xiàn)有的延遲任務(wù)和定時(shí)任務(wù),如果不再執(zhí)行,則取消任務(wù)并清空隊(duì)列;如果繼續(xù)執(zhí)行,將隊(duì)列中的任務(wù)強(qiáng)轉(zhuǎn)為RunnableScheduledFuture對象之后,從隊(duì)列中刪除并取消任務(wù)。大家需要好好理解這兩種處理方式。
最后調(diào)用ThreadPoolExecutor類的tryTerminate方法。有關(guān)ThreadPoolExecutor類的tryTerminate方法的源碼解析,大家可以參考【高并發(fā)專題】中的《高并發(fā)之——通過源碼深度分析線程池中Worker線程的執(zhí)行流程》一文,這里不再贅述。
至此,ScheduledThreadPoolExecutor類中的核心方法的源代碼,我們就分析完了,更多關(guān)于java高并發(fā)ScheduledThreadPoolExecutor的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
MyBatis學(xué)習(xí)教程(六)-調(diào)用存儲過程
這篇文章主要介紹了MyBatis學(xué)習(xí)教程(六)-調(diào)用存儲過程的相關(guān)資料,非常不錯,具有參考借鑒價(jià)值,感興趣的朋友一起看下吧2016-05-05
java 結(jié)合jQuery實(shí)現(xiàn)跨域名獲取數(shù)據(jù)的方法
下面小編就為大家?guī)硪黄猨ava 結(jié)合jQuery實(shí)現(xiàn)跨域名獲取數(shù)據(jù)的方法。小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-05-05
Java中使用BeanMap將對象轉(zhuǎn)為Map詳解
這篇文章主要介紹了Java中使用BeanMap將對象轉(zhuǎn)為Map詳解,BeanMap?是?Apache?Commons?BeanUtils?庫中的一個類,BeanMap?可以將?Java?對象的屬性作為鍵,屬性值作為對應(yīng)的值,存儲在一個?Map?中,它提供了一種將?Java?對象轉(zhuǎn)換為?Map?的方式,需要的朋友可以參考下2024-01-01
IDEA中Maven報(bào)錯Cannot resolve xxx的解決方法匯總(親測有效)
在IDEA中的pom文件中添加了依賴,并且正確加載了相應(yīng)依賴,pom文件沒有報(bào)紅,看起來像是把所有依賴庫全部加載進(jìn)來了,但是代碼中使用依賴的類庫使報(bào)紅,本文給大家介紹了IDEA中Maven報(bào)錯Cannot resolve xxx的解決方法匯總,需要的朋友可以參考下2024-06-06
springmvc+mybatis 做分頁sql 語句實(shí)例代碼
本文通過一段實(shí)例代碼給大家介紹了springmvc+mybatis 做分頁sql 語句的方法,代碼簡單易懂,非常不錯,具有參考借鑒價(jià)值,需要的朋友參考下吧2017-07-07
SpringCloud Config使用本地倉庫及map注入
這篇文章主要介紹了SpringCloud Config使用本地倉庫及map注入,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09

