一文搞懂Java的ThreadPoolExecutor原理
一. Executor框架簡介
Executor框架提供了組件來管理Java中的線程,Executor框架將其分為任務,線程執(zhí)行任務,任務執(zhí)行結果三部分。下面以表格形式對這三部分進行說明。
| 項 | 說明 |
|---|---|
| 任務 | Executor框架提供了Runnable接口和Callable接口,任務需要實現(xiàn)這兩個接口才能被線程執(zhí)行 |
| 線程執(zhí)行任務 | Executor框架提供了接口Executor和繼承于Executor的ExecutorService接口來定義任務執(zhí)行機制。Executor框架中的線程池類ThreadPoolExecutor和ScheduledThreadPoolExecutor均實現(xiàn)了ExecutorService接口 |
| 任務執(zhí)行結果 | Executor框架提供了Future接口和實現(xiàn)了Future接口的FutureTask類來定義任務執(zhí)行結果。 |
組件之間的類圖關系如下所示。

Executor接口是線程池的頂層接口,通常說到的線程池指的是ThreadPoolExecutor,同時ThreadPoolExecutor還有一個子類叫做ScheduledThreadPoolExecutor,其擴展實現(xiàn)了延時執(zhí)行任務和定時執(zhí)行任務的功能。
Executor框架指的是任務,執(zhí)行任務的線程池和任務執(zhí)行結果這三部分,切不可將Executor框架與Executor接口相混淆。
本篇文章就將對Executor框架中的ThreadPoolExecutor的源碼實現(xiàn)進行學習。
二. 認識ThreadPoolExecutor狀態(tài)
在學習ThreadPoolExecutor如何執(zhí)行任務前,先認識一下ThreadPoolExecutor的狀態(tài)。
ThreadPoolExecutor繼承于AbstractExecutorService,并實現(xiàn)了ExecutorService接口,是Executor框架的核心類,用于管理線程。
ThreadPoolExecutor使用了原子整型ctl來表示線程池狀態(tài)和Worker數(shù)量。ctl是一個原子整型,前3位表示線程池狀態(tài),后29位表示Worker數(shù)量。ThreadPoolExecutor中這部分的源碼如下所示。
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 取整型前3位,即獲取線程池狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 取整型后29位,即獲取Worker數(shù)量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 根據(jù)線程池狀態(tài)和Worker數(shù)量拼裝ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 線程池狀態(tài)判斷
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 線程池狀態(tài)判斷
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 判斷線程池狀態(tài)是否為RUNNING
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
......
}可知ThreadPoolExecutor有如下五種線程池狀態(tài)。
- RUNNING,線程池接受新任務,會執(zhí)行任務阻塞隊列中的任務,ctl前三位表示為111;
- SHUTDOWN,線程池拒絕新任務,會執(zhí)行任務阻塞隊列中的任務,ctl前三位表示為000;
- STOP,線程池拒絕新任務,不會執(zhí)行任務阻塞隊列中的任務,嘗試中斷正在執(zhí)行的任務,ctl前三位表示為001;
- TIDYING,所有任務被關閉,Worker數(shù)量為0,ctl前三位表示為010;
- TERMINATED,terminated() 執(zhí)行完畢,ctl前三位表示為011。
得益于ctl的結構,所以無論Worker數(shù)量是多少,ThreadPoolExecutor中線程池狀態(tài)存在如下關系。
RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
因此runStateLessThan(),runStateAtLeast() 和isRunning() 方法可以方便的對線程池狀態(tài)進行判斷。
三. 執(zhí)行任務源碼分析
作為線程池,ThreadPoolExecutor最重要也最經(jīng)典的地方,當然就是執(zhí)行任務了。本節(jié)對ThreadPoolExecutor執(zhí)行任務的流程進行一個學習。
ThreadPoolExecutor中執(zhí)行任務的入口方法為execute(),其實現(xiàn)如下。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果Worker數(shù)量小于核心線程數(shù),則創(chuàng)建Worker并執(zhí)行任務
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果Worker數(shù)量大于等于核心線程數(shù),則將任務添加到任務阻塞隊列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果線程池狀態(tài)突然不再是RUNNING,則嘗試將任務從任務阻塞隊列中刪除,刪除成功則為該任務執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果線程池中Worker數(shù)量突然為0,則創(chuàng)建一個Worker來執(zhí)行任務
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 執(zhí)行到這里表示線程池狀態(tài)已經(jīng)不是RUNNING或者任務阻塞隊列已滿
// 此時嘗試新建一個Worker來執(zhí)行任務
// 如果新建一個Worker來執(zhí)行任務失敗,表明線程池狀態(tài)不再是RUNNING或者Worker數(shù)量已經(jīng)達到最大線程數(shù),此時執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}在execute() 中會根據(jù)Worker數(shù)量和線程池狀態(tài)來決定是新建Worker來執(zhí)行任務還是將任務添加到任務阻塞隊列。新建Worker來執(zhí)行任務的實現(xiàn)如下所示。
private boolean addWorker(Runnable firstTask, boolean core) {
// 標記外層for循環(huán)
retry:
for (;;) {
int c = ctl.get();
// 獲取線程池狀態(tài)
int rs = runStateOf(c);
// 線程池狀態(tài)為RUNNING時,可以創(chuàng)建Worker
// 線程池狀態(tài)為SHUTDOWN,且任務阻塞隊列不為空時,可以創(chuàng)建初始任務為null的Worker
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 獲取Worker數(shù)量
int wc = workerCountOf(c);
// 如果Worker數(shù)量大于CAPACITY,拒絕創(chuàng)建Worker
// core為true表示創(chuàng)建核心線程Worker,如果Worker數(shù)量此時已經(jīng)大于等于核心線程數(shù),則拒絕創(chuàng)建Worker,轉而應該將任務添加到任務阻塞隊列
// core為false表示創(chuàng)建非核心線程Worker,如果Worker數(shù)量此時已經(jīng)大于等于最大線程數(shù),則拒絕創(chuàng)建Worker,轉而應該執(zhí)行拒絕策略
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 以CAS方式將Worker數(shù)量加1
// 加1成功表明無競爭發(fā)生,從外層for循環(huán)跳出
if (compareAndIncrementWorkerCount(c))
break retry;
// 加1失敗表明有競爭發(fā)生,此時需要重新獲取ctl的值
c = ctl.get();
// 重新獲取ctl后如果發(fā)現(xiàn)線程池狀態(tài)發(fā)生了改變,此時重新執(zhí)行外層for循環(huán),即需要基于新的線程池狀態(tài)判斷是否允許創(chuàng)建Worker
// 重新獲取ctl后如果線程池狀態(tài)未發(fā)生改變,則繼續(xù)執(zhí)行內(nèi)層for循環(huán),即嘗試再一次以CAS方式將Worker數(shù)量加1
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 創(chuàng)建一個Worker
w = new Worker(firstTask);
// 獲取Worker的線程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 由于線程池中存儲Worker的集合為HashSet,因此將Worker添加到Worker集合時需要獲取全局鎖保證線程安全
mainLock.lock();
try {
// 再一次獲取線程池狀態(tài)
int rs = runStateOf(ctl.get());
// 如果線程池狀態(tài)還是為RUNNING或者線程池狀態(tài)為SHUTDOWN但創(chuàng)建的Worker的初始任務為null,則允許將創(chuàng)建出來的Worker添加到集合
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 檢查一下Worker的線程是否可以啟動(處于活動狀態(tài)的線程無法再啟動)
if (t.isAlive())
throw new IllegalThreadStateException();
// 將Worker添加到Worker集合
workers.add(w);
int s = workers.size();
// largestPoolSize用于記錄線程池最多存在過的Worker數(shù)
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啟動Worker線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// Worker線程沒有成功啟動起來,此時需要對該Worker的創(chuàng)建執(zhí)行回滾操作
addWorkerFailed(w);
}
return workerStarted;
}addWorker() 方法中只允許兩種情況可以創(chuàng)建Worker。
- 線程池狀態(tài)為RUNNING,可以創(chuàng)建Worker;
- 線程池狀態(tài)為SHUTDOWN,且任務阻塞隊列不為空,可以創(chuàng)建初始任務為null的Worker。
一旦Worker創(chuàng)建成功,就會將Worker的線程啟動,如果Worker創(chuàng)建失敗或者Worker的線程啟動失敗,則會調(diào)用addWorkerFailed() 方法執(zhí)行回滾操作,其實現(xiàn)如下所示。
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 如果Worker添加到了Worker集合中,則將Worker從Worker集合中刪除
if (w != null)
workers.remove(w);
// 以CAS方式將Worker數(shù)量減1
decrementWorkerCount();
// 嘗試終止線程池
tryTerminate();
} finally {
mainLock.unlock();
}
}由于Worker自身實現(xiàn)了Runnable,因此Worker自身就是一個任務,實際上Worker的線程執(zhí)行的任務就是Worker本身,因此addWorker() 中將Worker的線程啟動時,會調(diào)用Worker的run() 方法,其實現(xiàn)如下。
public void run() {
runWorker(this);
}在Worker的run() 方法中調(diào)用了ThreadPoolExecutor的runWorker() 方法,其實現(xiàn)如下所示。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 如果task為null,則從任務阻塞隊列中獲取任務
// 通常Worker啟動時會先執(zhí)行初始任務,然后再去任務阻塞隊列中獲取任務
while (task != null || (task = getTask()) != null) {
w.lock();
// 線程池正在停止時,需要確保當前Worker的線程是被中斷的
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// Worker執(zhí)行任務發(fā)生異?;蛘邚膅etTask()中獲取任務為空時會執(zhí)行這里的邏輯
// processWorkerExit()會將Worker從Worker集合中刪除,并嘗試終止線程池
processWorkerExit(w, completedAbruptly);
}
}runWorker() 方法就是先讓Worker將初始任務(如果有的話)執(zhí)行完,然后循環(huán)從任務阻塞隊列中獲取任務來執(zhí)行,如果Worker執(zhí)行任務發(fā)生異?;蛘邚娜蝿兆枞犃蝎@取任務失?。ǐ@取到的任務為null),則調(diào)用processWorkerExit() 方法來將自身從Worker集合中刪除。下面先看一下getTask() 方法的實現(xiàn)。
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果線程池狀態(tài)為SHUTDOWN,且任務阻塞隊列為空,則不再允許從任務阻塞隊列中獲取任務
// 如果線程池狀態(tài)為STOP,則不再允許從任務阻塞隊列中獲取任務
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 如果allowCoreThreadTimeOut為true,或者當前線程數(shù)大于核心線程數(shù),此時timed為true,表明從任務阻塞隊列以超時退出的方式獲取任務
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果當前線程數(shù)大于最大線程數(shù),則當前Worker應該被刪除
// 如果當前Worker上一次從任務阻塞隊列中獲取任務時超時退出,且任務阻塞隊列現(xiàn)在還是為空,則當前Worker應該被刪除
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 從任務阻塞隊列中獲取任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
// 獲取到任務則返回該任務
return r;
// timedOut為true表明Worker上一次從任務阻塞隊列中獲取任務時超時退出
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}getTask() 方法在如下情況不允許Worker從任務阻塞隊列中獲取任務。
- 線程池狀態(tài)為SHUTDOWN,且任務阻塞隊列為空;
- 線程池狀態(tài)為STOP。
如果Worker有資格從任務阻塞隊列獲取任務,那么當allowCoreThreadTimeOut為true,或者當前線程數(shù)大于核心線程數(shù)時,Worker以超時退出的方式獲取任務,否則Worker以一直阻塞的方式獲取任務。
當Worker在getTask() 方法中獲取任務失敗時,getTask() 方法會返回null,從而導致Worker會執(zhí)行processWorkerExit() 方法來刪除自身,其實現(xiàn)如下所示。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// completedAbruptly為true表明是執(zhí)行任務時發(fā)生異常導致Worker需要被刪除
if (completedAbruptly)
// 修正Worker數(shù)量
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 將Worker從Worker集合中刪除
workers.remove(w);
} finally {
mainLock.unlock();
}
// 嘗試終止線程池
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}Worker在processWorkerExit() 方法中刪除自身之后,還會調(diào)用tryTerminate() 嘗試終止線程池,tryTerminate() 方法很精髓,后面會對其進行詳細分析,這里暫且不談。至此,Worker的創(chuàng)建,執(zhí)行任務,獲取任務和刪除的整個流程已經(jīng)大體分析完畢。
對于執(zhí)行任務,現(xiàn)在簡單進行一個小結。
ThreadPoolExecutor執(zhí)行任務,第一步是根據(jù)Worker數(shù)量來決定是新建Worker來執(zhí)行任務還是將任務添加到任務阻塞隊列,這里的判斷規(guī)則如下。
- 如果Worker數(shù)量小于核心線程數(shù),則創(chuàng)建Worker來執(zhí)行任務;
- 如果Worker數(shù)量大于等于核心線程數(shù),則將任務添加到任務阻塞隊列;
- 如果任務阻塞隊列已滿,則創(chuàng)建Worker來執(zhí)行任務;
- 如果Worker數(shù)量已經(jīng)達到最大線程數(shù),此時執(zhí)行任務拒絕策略。
當要新建Worker來執(zhí)行任務時,只有兩種情況可以新建Worker,如下所示。
- 線程池狀態(tài)為RUNNING,可以創(chuàng)建Worker;
- 線程池狀態(tài)為SHUTDOWN,且任務阻塞隊列不為空,可以創(chuàng)建初始任務為null的Worker。
Worker自身實現(xiàn)了Runnable,且Worker持有一個線程,當Worker啟動時,就是啟動Worker持有的線程,而這個線程執(zhí)行的任務就是Worker自身。
Worker啟動后,會首先執(zhí)行自己的初始任務,然后再去任務阻塞隊列中獲取任務。
四. 關閉線程池源碼分析
不再使用的線程池,可以進行關閉。關閉ThreadPoolExecutor的方法有shutdown() 和shutdownNow(),本節(jié)將對ThreadPoolExecutor的關閉進行分析。
1. shutdown()
首先分析shutdown() 方法,其實現(xiàn)如下。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 循環(huán)通過CAS方式將線程池狀態(tài)置為SHUTDOWN
advanceRunState(SHUTDOWN);
// 中斷空閑Worker
interruptIdleWorkers();
onShutdown();
} finally {
mainLock.unlock();
}
// 嘗試終止線程池
tryTerminate();
}在shutdown() 方法中首先會將線程池狀態(tài)置為SHUTDOWN,然后調(diào)用interruptIdleWorkers() 方法中斷空閑Worker,最后調(diào)用tryTerminate() 方法來嘗試終止線程池。那么這里要解釋一下什么是空閑Worker,先看一下interruptIdleWorkers() 的實現(xiàn)。
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 中斷線程前需要先嘗試獲取Worker的鎖
// 只能獲取到空閑Worker的鎖,所以shutdown()方法只會中斷空閑Worker
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}調(diào)用interruptIdleWorkers() 方法中斷Worker前首先需要嘗試獲取Worker的鎖,已知Worker除了實現(xiàn)Runnable接口外,還繼承于AbstractQueuedSynchronizer,因此Worker本身是一把鎖,然后在runWorker() 中Worker執(zhí)行任務前都會先獲取Worker的鎖,這里看一下Worker的lock() 方法的實現(xiàn)。
public void lock() {
acquire(1);
}
protected boolean tryAcquire(int unused) {
// 以CAS方式將state從0設置為1
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}可以發(fā)現(xiàn),Worker在lock() 中調(diào)用了acquire() 方法,該方法由AbstractQueuedSynchronizer抽象類提供,在acquire() 中會調(diào)用其子類實現(xiàn)的tryAcquire() 方法,tryAcquire() 方法會以CAS方式將state從0設置為1,因此這樣的設計讓Worker是一把不可重入鎖。
回到interruptIdleWorkers() 方法,前面提到該方法中斷Worker前會嘗試獲取Worker的鎖,能夠獲取到鎖才會中斷Worker,而因為Worker是不可重入鎖,所以正在執(zhí)行任務的Worker是無法獲取到鎖的,只有那些沒有執(zhí)行任務的Worker的鎖才能夠被獲取,因此所謂的中斷空閑Worker,實際就是中斷沒有執(zhí)行任務的Worker,那些執(zhí)行任務的Worker在shutdown() 方法被調(diào)用時不會被中斷,這些Worker執(zhí)行完任務后會繼續(xù)從任務阻塞隊列中獲取任務來執(zhí)行,直到任務阻塞隊列為空,此時沒有被中斷過的Worker也會被刪除掉,等到線程池中沒有Worker以及任務阻塞隊列沒有任務后,線程池才會被終止掉。
對于shutdown() 方法,一句話總結就是:將線程池狀態(tài)置為SHUTDOWN并拒絕接受新任務,等到線程池Worker數(shù)量為0,任務阻塞隊列為空時,關閉線程池。
2. shutdownNow()
現(xiàn)在再來分析shutdownNow() 方法。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 循環(huán)通過CAS方式將線程池狀態(tài)置為STOP
advanceRunState(STOP);
// 中斷所有Worker
interruptWorkers();
// 將任務阻塞隊列中的任務獲取出來并返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 嘗試終止線程池
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 中斷線程池中所有Worker
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}在shutdownNow() 方法中首先會將線程池狀態(tài)置為STOP,然后調(diào)用interruptWorkers() 方法中斷線程池中的所有Worker,接著調(diào)用tryTerminate() 方法來嘗試終止線程池,最后shutdownNow() 方法會將任務阻塞隊列中還未被執(zhí)行的任務返回。
shutdownNow() 方法調(diào)用之后,線程池中的所有Worker都會被中斷,包括正在執(zhí)行任務的Worker,等到所有Worker都被刪除之后,線程池即被終止,也就是說,shutdownNow() 不會保證當前時刻正在執(zhí)行的任務會被安全的執(zhí)行完,并且會放棄執(zhí)行任務阻塞隊列中的所有任務。
3. tryTerminate()
關于線程池的關閉,還有一個重要的方法,那就是前面多次提到的tryTerminate() 方法,該方法能確保線程池可以被正確的關閉,其實現(xiàn)如下所示。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 如果線程池狀態(tài)為RUNNING,則沒有資格終止線程池
// 如果線程池狀態(tài)大于等于TIDYING,則沒有資格終止線程池
// 如果線程池狀態(tài)為SHUTDOWN但任務阻塞隊列不為空,則沒有資格終止線程池
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 線程池狀態(tài)為SHUTDOWN且任務阻塞隊列為空會執(zhí)行到這里
// 線程池狀態(tài)為STOP會執(zhí)行到這里
// Worker數(shù)量不為0,表明當前還有正在執(zhí)行任務的Worker或者空閑的Worker,此時中斷一個空閑的Worker
// 在這里被中斷的空閑Worker會在getTask()方法中返回null,從而執(zhí)行processWorkerExit(),最終該Worker會被刪除
// processWorkerExit()方法中又會調(diào)用tryTerminate(),因此將shutdown信號在空閑Worker之間進行了傳播
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 將線程池狀態(tài)置為TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 終止線程池
terminated();
} finally {
// 將線程池狀態(tài)最終置為TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}在tryTerminate() 方法的官方注釋中給出了兩種線程池會被終止的情況:
- 線程池的狀態(tài)為SHUTDOWN,Worker數(shù)量為0,任務阻塞隊列為空;
- 線程池的狀態(tài)為STOP,Worker數(shù)量為0。
官方注釋中還說明在所有可能導致線程池終止的操作中都應該調(diào)用tryTerminate() 方法來嘗試終止線程池,因此線程池中Worker被刪除時和任務阻塞隊列中任務被刪除時會調(diào)用tryTerminate(),以達到在線程池符合終止條件時及時終止線程池。
4. 小結
對于關閉線程池,簡單小結如下。
關閉ThreadPoolExecutor有兩種方式,如下所示。
- shutdown()。調(diào)用shutdown() 方法會首先將線程池狀態(tài)置為SHUTDOWN并拒絕接受新任務,然后中斷空閑Worker,等到線程池中Worker數(shù)量為0,任務阻塞隊列為空時,線程池被真正關閉;
- shutdownNow()。調(diào)用shutdownNow() 方法會首先將線程池狀態(tài)置為STOP,然后中斷所有Worker(包括正在執(zhí)行任務的Worker),并將任務阻塞隊列中還未被執(zhí)行的任務返回,當線程池Worker數(shù)量為0時,線程池被真正關閉。
還有一點需要說明,Worker除了實現(xiàn)Runnable接口外,還繼承于AbstractQueuedSynchronizer,因此Worker本身是一把鎖,Worker執(zhí)行任務前都會先獲取Worker的鎖,所以正在執(zhí)行任務的Worker的鎖是無法被獲取的,換言之,只有沒有執(zhí)行任務的Worker的鎖才能被獲取,這些Worker就稱為空閑Worker。
以上就是一文搞懂Java的ThreadPoolExecutor原理的詳細內(nèi)容,更多關于Java ThreadPoolExecutor的資料請關注腳本之家其它相關文章!
- Java線程池?ThreadPoolExecutor?詳解
- Java多線程ThreadPoolExecutor詳解
- Java線程池ThreadPoolExecutor源碼深入分析
- java高并發(fā)ThreadPoolExecutor類解析線程池執(zhí)行流程
- java高并發(fā)ScheduledThreadPoolExecutor與Timer區(qū)別
- 徹底搞懂java并發(fā)ThreadPoolExecutor使用
- Java多線程編程基石ThreadPoolExecutor示例詳解
- 源碼分析Java中ThreadPoolExecutor的底層原理
- 一文弄懂Java中ThreadPoolExecutor
相關文章
spring boot配置讀寫分離的完整實現(xiàn)步驟
數(shù)據(jù)庫配置主從之后,如何在代碼層面實現(xiàn)讀寫分離?所以下面這篇文章主要給大家介紹了關于spring boot配置讀寫分離的完整步驟,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2018-09-09
使用postman傳遞list集合后臺springmvc接收
這篇文章主要介紹了使用postman傳遞list集合后臺springmvc接收的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08
Maven中怎么手動添加jar包到本地倉庫詳解(repository)
這篇文章主要給大家介紹了關于Maven中怎么手動添加jar包到本地倉庫的相關資料,文中通過圖文以及實例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2023-04-04
解析SpringBoot中使用LoadTimeWeaving技術實現(xiàn)AOP功能
這篇文章主要介紹了SpringBoot中使用LoadTimeWeaving技術實現(xiàn)AOP功能,AOP面向切面編程,通過為目標類織入切面的方式,實現(xiàn)對目標類功能的增強,本文給大家介紹的非常詳細,需要的朋友可以參考下2022-09-09

