一文搞懂Java的ThreadPoolExecutor原理
一. Executor框架簡(jiǎn)介
Executor框架提供了組件來(lái)管理Java中的線程,Executor框架將其分為任務(wù),線程執(zhí)行任務(wù),任務(wù)執(zhí)行結(jié)果三部分。下面以表格形式對(duì)這三部分進(jìn)行說(shuō)明。
| 項(xiàng) | 說(shuō)明 |
|---|---|
| 任務(wù) | Executor框架提供了Runnable接口和Callable接口,任務(wù)需要實(shí)現(xiàn)這兩個(gè)接口才能被線程執(zhí)行 |
| 線程執(zhí)行任務(wù) | Executor框架提供了接口Executor和繼承于Executor的ExecutorService接口來(lái)定義任務(wù)執(zhí)行機(jī)制。Executor框架中的線程池類(lèi)ThreadPoolExecutor和ScheduledThreadPoolExecutor均實(shí)現(xiàn)了ExecutorService接口 |
| 任務(wù)執(zhí)行結(jié)果 | Executor框架提供了Future接口和實(shí)現(xiàn)了Future接口的FutureTask類(lèi)來(lái)定義任務(wù)執(zhí)行結(jié)果。 |
組件之間的類(lèi)圖關(guān)系如下所示。

Executor接口是線程池的頂層接口,通常說(shuō)到的線程池指的是ThreadPoolExecutor,同時(shí)ThreadPoolExecutor還有一個(gè)子類(lèi)叫做ScheduledThreadPoolExecutor,其擴(kuò)展實(shí)現(xiàn)了延時(shí)執(zhí)行任務(wù)和定時(shí)執(zhí)行任務(wù)的功能。
Executor框架指的是任務(wù),執(zhí)行任務(wù)的線程池和任務(wù)執(zhí)行結(jié)果這三部分,切不可將Executor框架與Executor接口相混淆。
本篇文章就將對(duì)Executor框架中的ThreadPoolExecutor的源碼實(shí)現(xiàn)進(jìn)行學(xué)習(xí)。
二. 認(rèn)識(shí)ThreadPoolExecutor狀態(tài)
在學(xué)習(xí)ThreadPoolExecutor如何執(zhí)行任務(wù)前,先認(rèn)識(shí)一下ThreadPoolExecutor的狀態(tài)。
ThreadPoolExecutor繼承于AbstractExecutorService,并實(shí)現(xiàn)了ExecutorService接口,是Executor框架的核心類(lèi),用于管理線程。
ThreadPoolExecutor使用了原子整型ctl來(lái)表示線程池狀態(tài)和Worker數(shù)量。ctl是一個(gè)原子整型,前3位表示線程池狀態(tài),后29位表示Worker數(shù)量。ThreadPoolExecutor中這部分的源碼如下所示。
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 取整型前3位,即獲取線程池狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 取整型后29位,即獲取Worker數(shù)量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 根據(jù)線程池狀態(tài)和Worker數(shù)量拼裝ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 線程池狀態(tài)判斷
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 線程池狀態(tài)判斷
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 判斷線程池狀態(tài)是否為RUNNING
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
......
}可知ThreadPoolExecutor有如下五種線程池狀態(tài)。
- RUNNING,線程池接受新任務(wù),會(huì)執(zhí)行任務(wù)阻塞隊(duì)列中的任務(wù),ctl前三位表示為111;
- SHUTDOWN,線程池拒絕新任務(wù),會(huì)執(zhí)行任務(wù)阻塞隊(duì)列中的任務(wù),ctl前三位表示為000;
- STOP,線程池拒絕新任務(wù),不會(huì)執(zhí)行任務(wù)阻塞隊(duì)列中的任務(wù),嘗試中斷正在執(zhí)行的任務(wù),ctl前三位表示為001;
- TIDYING,所有任務(wù)被關(guān)閉,Worker數(shù)量為0,ctl前三位表示為010;
- TERMINATED,terminated() 執(zhí)行完畢,ctl前三位表示為011。
得益于ctl的結(jié)構(gòu),所以無(wú)論Worker數(shù)量是多少,ThreadPoolExecutor中線程池狀態(tài)存在如下關(guān)系。
RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
因此runStateLessThan(),runStateAtLeast() 和isRunning() 方法可以方便的對(duì)線程池狀態(tài)進(jìn)行判斷。
三. 執(zhí)行任務(wù)源碼分析
作為線程池,ThreadPoolExecutor最重要也最經(jīng)典的地方,當(dāng)然就是執(zhí)行任務(wù)了。本節(jié)對(duì)ThreadPoolExecutor執(zhí)行任務(wù)的流程進(jìn)行一個(gè)學(xué)習(xí)。
ThreadPoolExecutor中執(zhí)行任務(wù)的入口方法為execute(),其實(shí)現(xiàn)如下。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果Worker數(shù)量小于核心線程數(shù),則創(chuàng)建Worker并執(zhí)行任務(wù)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果Worker數(shù)量大于等于核心線程數(shù),則將任務(wù)添加到任務(wù)阻塞隊(duì)列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果線程池狀態(tài)突然不再是RUNNING,則嘗試將任務(wù)從任務(wù)阻塞隊(duì)列中刪除,刪除成功則為該任務(wù)執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果線程池中Worker數(shù)量突然為0,則創(chuàng)建一個(gè)Worker來(lái)執(zhí)行任務(wù)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 執(zhí)行到這里表示線程池狀態(tài)已經(jīng)不是RUNNING或者任務(wù)阻塞隊(duì)列已滿
// 此時(shí)嘗試新建一個(gè)Worker來(lái)執(zhí)行任務(wù)
// 如果新建一個(gè)Worker來(lái)執(zhí)行任務(wù)失敗,表明線程池狀態(tài)不再是RUNNING或者Worker數(shù)量已經(jīng)達(dá)到最大線程數(shù),此時(shí)執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}在execute() 中會(huì)根據(jù)Worker數(shù)量和線程池狀態(tài)來(lái)決定是新建Worker來(lái)執(zhí)行任務(wù)還是將任務(wù)添加到任務(wù)阻塞隊(duì)列。新建Worker來(lái)執(zhí)行任務(wù)的實(shí)現(xiàn)如下所示。
private boolean addWorker(Runnable firstTask, boolean core) {
// 標(biāo)記外層for循環(huán)
retry:
for (;;) {
int c = ctl.get();
// 獲取線程池狀態(tài)
int rs = runStateOf(c);
// 線程池狀態(tài)為RUNNING時(shí),可以創(chuàng)建Worker
// 線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列不為空時(shí),可以創(chuàng)建初始任務(wù)為null的Worker
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 獲取Worker數(shù)量
int wc = workerCountOf(c);
// 如果Worker數(shù)量大于CAPACITY,拒絕創(chuàng)建Worker
// core為true表示創(chuàng)建核心線程Worker,如果Worker數(shù)量此時(shí)已經(jīng)大于等于核心線程數(shù),則拒絕創(chuàng)建Worker,轉(zhuǎn)而應(yīng)該將任務(wù)添加到任務(wù)阻塞隊(duì)列
// core為false表示創(chuàng)建非核心線程Worker,如果Worker數(shù)量此時(shí)已經(jīng)大于等于最大線程數(shù),則拒絕創(chuàng)建Worker,轉(zhuǎn)而應(yīng)該執(zhí)行拒絕策略
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 以CAS方式將Worker數(shù)量加1
// 加1成功表明無(wú)競(jìng)爭(zhēng)發(fā)生,從外層for循環(huán)跳出
if (compareAndIncrementWorkerCount(c))
break retry;
// 加1失敗表明有競(jìng)爭(zhēng)發(fā)生,此時(shí)需要重新獲取ctl的值
c = ctl.get();
// 重新獲取ctl后如果發(fā)現(xiàn)線程池狀態(tài)發(fā)生了改變,此時(shí)重新執(zhí)行外層for循環(huán),即需要基于新的線程池狀態(tài)判斷是否允許創(chuàng)建Worker
// 重新獲取ctl后如果線程池狀態(tài)未發(fā)生改變,則繼續(xù)執(zhí)行內(nèi)層for循環(huán),即嘗試再一次以CAS方式將Worker數(shù)量加1
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 創(chuàng)建一個(gè)Worker
w = new Worker(firstTask);
// 獲取Worker的線程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 由于線程池中存儲(chǔ)Worker的集合為HashSet,因此將Worker添加到Worker集合時(shí)需要獲取全局鎖保證線程安全
mainLock.lock();
try {
// 再一次獲取線程池狀態(tài)
int rs = runStateOf(ctl.get());
// 如果線程池狀態(tài)還是為RUNNING或者線程池狀態(tài)為SHUTDOWN但創(chuàng)建的Worker的初始任務(wù)為null,則允許將創(chuàng)建出來(lái)的Worker添加到集合
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 檢查一下Worker的線程是否可以啟動(dòng)(處于活動(dòng)狀態(tài)的線程無(wú)法再啟動(dòng))
if (t.isAlive())
throw new IllegalThreadStateException();
// 將Worker添加到Worker集合
workers.add(w);
int s = workers.size();
// largestPoolSize用于記錄線程池最多存在過(guò)的Worker數(shù)
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啟動(dòng)Worker線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// Worker線程沒(méi)有成功啟動(dòng)起來(lái),此時(shí)需要對(duì)該Worker的創(chuàng)建執(zhí)行回滾操作
addWorkerFailed(w);
}
return workerStarted;
}addWorker() 方法中只允許兩種情況可以創(chuàng)建Worker。
- 線程池狀態(tài)為RUNNING,可以創(chuàng)建Worker;
- 線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列不為空,可以創(chuàng)建初始任務(wù)為null的Worker。
一旦Worker創(chuàng)建成功,就會(huì)將Worker的線程啟動(dòng),如果Worker創(chuàng)建失敗或者Worker的線程啟動(dòng)失敗,則會(huì)調(diào)用addWorkerFailed() 方法執(zhí)行回滾操作,其實(shí)現(xiàn)如下所示。
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 如果Worker添加到了Worker集合中,則將Worker從Worker集合中刪除
if (w != null)
workers.remove(w);
// 以CAS方式將Worker數(shù)量減1
decrementWorkerCount();
// 嘗試終止線程池
tryTerminate();
} finally {
mainLock.unlock();
}
}由于Worker自身實(shí)現(xiàn)了Runnable,因此Worker自身就是一個(gè)任務(wù),實(shí)際上Worker的線程執(zhí)行的任務(wù)就是Worker本身,因此addWorker() 中將Worker的線程啟動(dòng)時(shí),會(huì)調(diào)用Worker的run() 方法,其實(shí)現(xiàn)如下。
public void run() {
runWorker(this);
}在Worker的run() 方法中調(diào)用了ThreadPoolExecutor的runWorker() 方法,其實(shí)現(xiàn)如下所示。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 如果task為null,則從任務(wù)阻塞隊(duì)列中獲取任務(wù)
// 通常Worker啟動(dòng)時(shí)會(huì)先執(zhí)行初始任務(wù),然后再去任務(wù)阻塞隊(duì)列中獲取任務(wù)
while (task != null || (task = getTask()) != null) {
w.lock();
// 線程池正在停止時(shí),需要確保當(dāng)前Worker的線程是被中斷的
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// Worker執(zhí)行任務(wù)發(fā)生異?;蛘邚膅etTask()中獲取任務(wù)為空時(shí)會(huì)執(zhí)行這里的邏輯
// processWorkerExit()會(huì)將Worker從Worker集合中刪除,并嘗試終止線程池
processWorkerExit(w, completedAbruptly);
}
}runWorker() 方法就是先讓Worker將初始任務(wù)(如果有的話)執(zhí)行完,然后循環(huán)從任務(wù)阻塞隊(duì)列中獲取任務(wù)來(lái)執(zhí)行,如果Worker執(zhí)行任務(wù)發(fā)生異?;蛘邚娜蝿?wù)阻塞隊(duì)列獲取任務(wù)失?。ǐ@取到的任務(wù)為null),則調(diào)用processWorkerExit() 方法來(lái)將自身從Worker集合中刪除。下面先看一下getTask() 方法的實(shí)現(xiàn)。
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列為空,則不再允許從任務(wù)阻塞隊(duì)列中獲取任務(wù)
// 如果線程池狀態(tài)為STOP,則不再允許從任務(wù)阻塞隊(duì)列中獲取任務(wù)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 如果allowCoreThreadTimeOut為true,或者當(dāng)前線程數(shù)大于核心線程數(shù),此時(shí)timed為true,表明從任務(wù)阻塞隊(duì)列以超時(shí)退出的方式獲取任務(wù)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果當(dāng)前線程數(shù)大于最大線程數(shù),則當(dāng)前Worker應(yīng)該被刪除
// 如果當(dāng)前Worker上一次從任務(wù)阻塞隊(duì)列中獲取任務(wù)時(shí)超時(shí)退出,且任務(wù)阻塞隊(duì)列現(xiàn)在還是為空,則當(dāng)前Worker應(yīng)該被刪除
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 從任務(wù)阻塞隊(duì)列中獲取任務(wù)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
// 獲取到任務(wù)則返回該任務(wù)
return r;
// timedOut為true表明Worker上一次從任務(wù)阻塞隊(duì)列中獲取任務(wù)時(shí)超時(shí)退出
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}getTask() 方法在如下情況不允許Worker從任務(wù)阻塞隊(duì)列中獲取任務(wù)。
- 線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列為空;
- 線程池狀態(tài)為STOP。
如果Worker有資格從任務(wù)阻塞隊(duì)列獲取任務(wù),那么當(dāng)allowCoreThreadTimeOut為true,或者當(dāng)前線程數(shù)大于核心線程數(shù)時(shí),Worker以超時(shí)退出的方式獲取任務(wù),否則Worker以一直阻塞的方式獲取任務(wù)。
當(dāng)Worker在getTask() 方法中獲取任務(wù)失敗時(shí),getTask() 方法會(huì)返回null,從而導(dǎo)致Worker會(huì)執(zhí)行processWorkerExit() 方法來(lái)刪除自身,其實(shí)現(xiàn)如下所示。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// completedAbruptly為true表明是執(zhí)行任務(wù)時(shí)發(fā)生異常導(dǎo)致Worker需要被刪除
if (completedAbruptly)
// 修正Worker數(shù)量
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 將Worker從Worker集合中刪除
workers.remove(w);
} finally {
mainLock.unlock();
}
// 嘗試終止線程池
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}Worker在processWorkerExit() 方法中刪除自身之后,還會(huì)調(diào)用tryTerminate() 嘗試終止線程池,tryTerminate() 方法很精髓,后面會(huì)對(duì)其進(jìn)行詳細(xì)分析,這里暫且不談。至此,Worker的創(chuàng)建,執(zhí)行任務(wù),獲取任務(wù)和刪除的整個(gè)流程已經(jīng)大體分析完畢。
對(duì)于執(zhí)行任務(wù),現(xiàn)在簡(jiǎn)單進(jìn)行一個(gè)小結(jié)。
ThreadPoolExecutor執(zhí)行任務(wù),第一步是根據(jù)Worker數(shù)量來(lái)決定是新建Worker來(lái)執(zhí)行任務(wù)還是將任務(wù)添加到任務(wù)阻塞隊(duì)列,這里的判斷規(guī)則如下。
- 如果Worker數(shù)量小于核心線程數(shù),則創(chuàng)建Worker來(lái)執(zhí)行任務(wù);
- 如果Worker數(shù)量大于等于核心線程數(shù),則將任務(wù)添加到任務(wù)阻塞隊(duì)列;
- 如果任務(wù)阻塞隊(duì)列已滿,則創(chuàng)建Worker來(lái)執(zhí)行任務(wù);
- 如果Worker數(shù)量已經(jīng)達(dá)到最大線程數(shù),此時(shí)執(zhí)行任務(wù)拒絕策略。
當(dāng)要新建Worker來(lái)執(zhí)行任務(wù)時(shí),只有兩種情況可以新建Worker,如下所示。
- 線程池狀態(tài)為RUNNING,可以創(chuàng)建Worker;
- 線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列不為空,可以創(chuàng)建初始任務(wù)為null的Worker。
Worker自身實(shí)現(xiàn)了Runnable,且Worker持有一個(gè)線程,當(dāng)Worker啟動(dòng)時(shí),就是啟動(dòng)Worker持有的線程,而這個(gè)線程執(zhí)行的任務(wù)就是Worker自身。
Worker啟動(dòng)后,會(huì)首先執(zhí)行自己的初始任務(wù),然后再去任務(wù)阻塞隊(duì)列中獲取任務(wù)。
四. 關(guān)閉線程池源碼分析
不再使用的線程池,可以進(jìn)行關(guān)閉。關(guān)閉ThreadPoolExecutor的方法有shutdown() 和shutdownNow(),本節(jié)將對(duì)ThreadPoolExecutor的關(guān)閉進(jìn)行分析。
1. shutdown()
首先分析shutdown() 方法,其實(shí)現(xiàn)如下。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 循環(huán)通過(guò)CAS方式將線程池狀態(tài)置為SHUTDOWN
advanceRunState(SHUTDOWN);
// 中斷空閑Worker
interruptIdleWorkers();
onShutdown();
} finally {
mainLock.unlock();
}
// 嘗試終止線程池
tryTerminate();
}在shutdown() 方法中首先會(huì)將線程池狀態(tài)置為SHUTDOWN,然后調(diào)用interruptIdleWorkers() 方法中斷空閑Worker,最后調(diào)用tryTerminate() 方法來(lái)嘗試終止線程池。那么這里要解釋一下什么是空閑Worker,先看一下interruptIdleWorkers() 的實(shí)現(xiàn)。
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 中斷線程前需要先嘗試獲取Worker的鎖
// 只能獲取到空閑Worker的鎖,所以shutdown()方法只會(huì)中斷空閑Worker
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}調(diào)用interruptIdleWorkers() 方法中斷Worker前首先需要嘗試獲取Worker的鎖,已知Worker除了實(shí)現(xiàn)Runnable接口外,還繼承于AbstractQueuedSynchronizer,因此Worker本身是一把鎖,然后在runWorker() 中Worker執(zhí)行任務(wù)前都會(huì)先獲取Worker的鎖,這里看一下Worker的lock() 方法的實(shí)現(xiàn)。
public void lock() {
acquire(1);
}
protected boolean tryAcquire(int unused) {
// 以CAS方式將state從0設(shè)置為1
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}可以發(fā)現(xiàn),Worker在lock() 中調(diào)用了acquire() 方法,該方法由AbstractQueuedSynchronizer抽象類(lèi)提供,在acquire() 中會(huì)調(diào)用其子類(lèi)實(shí)現(xiàn)的tryAcquire() 方法,tryAcquire() 方法會(huì)以CAS方式將state從0設(shè)置為1,因此這樣的設(shè)計(jì)讓Worker是一把不可重入鎖。
回到interruptIdleWorkers() 方法,前面提到該方法中斷Worker前會(huì)嘗試獲取Worker的鎖,能夠獲取到鎖才會(huì)中斷Worker,而因?yàn)?strong>Worker是不可重入鎖,所以正在執(zhí)行任務(wù)的Worker是無(wú)法獲取到鎖的,只有那些沒(méi)有執(zhí)行任務(wù)的Worker的鎖才能夠被獲取,因此所謂的中斷空閑Worker,實(shí)際就是中斷沒(méi)有執(zhí)行任務(wù)的Worker,那些執(zhí)行任務(wù)的Worker在shutdown() 方法被調(diào)用時(shí)不會(huì)被中斷,這些Worker執(zhí)行完任務(wù)后會(huì)繼續(xù)從任務(wù)阻塞隊(duì)列中獲取任務(wù)來(lái)執(zhí)行,直到任務(wù)阻塞隊(duì)列為空,此時(shí)沒(méi)有被中斷過(guò)的Worker也會(huì)被刪除掉,等到線程池中沒(méi)有Worker以及任務(wù)阻塞隊(duì)列沒(méi)有任務(wù)后,線程池才會(huì)被終止掉。
對(duì)于shutdown() 方法,一句話總結(jié)就是:將線程池狀態(tài)置為SHUTDOWN并拒絕接受新任務(wù),等到線程池Worker數(shù)量為0,任務(wù)阻塞隊(duì)列為空時(shí),關(guān)閉線程池。
2. shutdownNow()
現(xiàn)在再來(lái)分析shutdownNow() 方法。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 循環(huán)通過(guò)CAS方式將線程池狀態(tài)置為STOP
advanceRunState(STOP);
// 中斷所有Worker
interruptWorkers();
// 將任務(wù)阻塞隊(duì)列中的任務(wù)獲取出來(lái)并返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 嘗試終止線程池
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 中斷線程池中所有Worker
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}在shutdownNow() 方法中首先會(huì)將線程池狀態(tài)置為STOP,然后調(diào)用interruptWorkers() 方法中斷線程池中的所有Worker,接著調(diào)用tryTerminate() 方法來(lái)嘗試終止線程池,最后shutdownNow() 方法會(huì)將任務(wù)阻塞隊(duì)列中還未被執(zhí)行的任務(wù)返回。
shutdownNow() 方法調(diào)用之后,線程池中的所有Worker都會(huì)被中斷,包括正在執(zhí)行任務(wù)的Worker,等到所有Worker都被刪除之后,線程池即被終止,也就是說(shuō),shutdownNow() 不會(huì)保證當(dāng)前時(shí)刻正在執(zhí)行的任務(wù)會(huì)被安全的執(zhí)行完,并且會(huì)放棄執(zhí)行任務(wù)阻塞隊(duì)列中的所有任務(wù)。
3. tryTerminate()
關(guān)于線程池的關(guān)閉,還有一個(gè)重要的方法,那就是前面多次提到的tryTerminate() 方法,該方法能確保線程池可以被正確的關(guān)閉,其實(shí)現(xiàn)如下所示。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 如果線程池狀態(tài)為RUNNING,則沒(méi)有資格終止線程池
// 如果線程池狀態(tài)大于等于TIDYING,則沒(méi)有資格終止線程池
// 如果線程池狀態(tài)為SHUTDOWN但任務(wù)阻塞隊(duì)列不為空,則沒(méi)有資格終止線程池
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 線程池狀態(tài)為SHUTDOWN且任務(wù)阻塞隊(duì)列為空會(huì)執(zhí)行到這里
// 線程池狀態(tài)為STOP會(huì)執(zhí)行到這里
// Worker數(shù)量不為0,表明當(dāng)前還有正在執(zhí)行任務(wù)的Worker或者空閑的Worker,此時(shí)中斷一個(gè)空閑的Worker
// 在這里被中斷的空閑Worker會(huì)在getTask()方法中返回null,從而執(zhí)行processWorkerExit(),最終該Worker會(huì)被刪除
// processWorkerExit()方法中又會(huì)調(diào)用tryTerminate(),因此將shutdown信號(hào)在空閑Worker之間進(jìn)行了傳播
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 將線程池狀態(tài)置為T(mén)IDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 終止線程池
terminated();
} finally {
// 將線程池狀態(tài)最終置為T(mén)ERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}在tryTerminate() 方法的官方注釋中給出了兩種線程池會(huì)被終止的情況:
- 線程池的狀態(tài)為SHUTDOWN,Worker數(shù)量為0,任務(wù)阻塞隊(duì)列為空;
- 線程池的狀態(tài)為STOP,Worker數(shù)量為0。
官方注釋中還說(shuō)明在所有可能導(dǎo)致線程池終止的操作中都應(yīng)該調(diào)用tryTerminate() 方法來(lái)嘗試終止線程池,因此線程池中Worker被刪除時(shí)和任務(wù)阻塞隊(duì)列中任務(wù)被刪除時(shí)會(huì)調(diào)用tryTerminate(),以達(dá)到在線程池符合終止條件時(shí)及時(shí)終止線程池。
4. 小結(jié)
對(duì)于關(guān)閉線程池,簡(jiǎn)單小結(jié)如下。
關(guān)閉ThreadPoolExecutor有兩種方式,如下所示。
- shutdown()。調(diào)用shutdown() 方法會(huì)首先將線程池狀態(tài)置為SHUTDOWN并拒絕接受新任務(wù),然后中斷空閑Worker,等到線程池中Worker數(shù)量為0,任務(wù)阻塞隊(duì)列為空時(shí),線程池被真正關(guān)閉;
- shutdownNow()。調(diào)用shutdownNow() 方法會(huì)首先將線程池狀態(tài)置為STOP,然后中斷所有Worker(包括正在執(zhí)行任務(wù)的Worker),并將任務(wù)阻塞隊(duì)列中還未被執(zhí)行的任務(wù)返回,當(dāng)線程池Worker數(shù)量為0時(shí),線程池被真正關(guān)閉。
還有一點(diǎn)需要說(shuō)明,Worker除了實(shí)現(xiàn)Runnable接口外,還繼承于AbstractQueuedSynchronizer,因此Worker本身是一把鎖,Worker執(zhí)行任務(wù)前都會(huì)先獲取Worker的鎖,所以正在執(zhí)行任務(wù)的Worker的鎖是無(wú)法被獲取的,換言之,只有沒(méi)有執(zhí)行任務(wù)的Worker的鎖才能被獲取,這些Worker就稱(chēng)為空閑Worker。
以上就是一文搞懂Java的ThreadPoolExecutor原理的詳細(xì)內(nèi)容,更多關(guān)于Java ThreadPoolExecutor的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Java線程池?ThreadPoolExecutor?詳解
- Java多線程ThreadPoolExecutor詳解
- Java線程池ThreadPoolExecutor源碼深入分析
- java高并發(fā)ThreadPoolExecutor類(lèi)解析線程池執(zhí)行流程
- java高并發(fā)ScheduledThreadPoolExecutor與Timer區(qū)別
- 徹底搞懂java并發(fā)ThreadPoolExecutor使用
- Java多線程編程基石ThreadPoolExecutor示例詳解
- 源碼分析Java中ThreadPoolExecutor的底層原理
- 一文弄懂Java中ThreadPoolExecutor
相關(guān)文章
spring boot配置讀寫(xiě)分離的完整實(shí)現(xiàn)步驟
數(shù)據(jù)庫(kù)配置主從之后,如何在代碼層面實(shí)現(xiàn)讀寫(xiě)分離?所以下面這篇文章主要給大家介紹了關(guān)于spring boot配置讀寫(xiě)分離的完整步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2018-09-09
通過(guò)實(shí)例解析spring對(duì)象生命周期
這篇文章主要介紹了通過(guò)實(shí)例解析spring對(duì)象生命周期,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03
Java結(jié)構(gòu)型設(shè)計(jì)模式之橋接模式詳細(xì)講解
橋接,顧名思義,就是用來(lái)連接兩個(gè)部分,使得兩個(gè)部分可以互相通訊。橋接模式將系統(tǒng)的抽象部分與實(shí)現(xiàn)部分分離解耦,使他們可以獨(dú)立的變化。本文通過(guò)示例詳細(xì)介紹了橋接模式的原理與使用,需要的可以參考一下2022-09-09
使用postman傳遞list集合后臺(tái)springmvc接收
這篇文章主要介紹了使用postman傳遞list集合后臺(tái)springmvc接收的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08
Maven中怎么手動(dòng)添加jar包到本地倉(cāng)庫(kù)詳解(repository)
這篇文章主要給大家介紹了關(guān)于Maven中怎么手動(dòng)添加jar包到本地倉(cāng)庫(kù)的相關(guān)資料,文中通過(guò)圖文以及實(shí)例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2023-04-04
解析SpringBoot中使用LoadTimeWeaving技術(shù)實(shí)現(xiàn)AOP功能
這篇文章主要介紹了SpringBoot中使用LoadTimeWeaving技術(shù)實(shí)現(xiàn)AOP功能,AOP面向切面編程,通過(guò)為目標(biāo)類(lèi)織入切面的方式,實(shí)現(xiàn)對(duì)目標(biāo)類(lèi)功能的增強(qiáng),本文給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-09-09

