一文搞懂Java的ThreadPoolExecutor原理
一. Executor框架簡介
Executor框架提供了組件來管理Java中的線程,Executor框架將其分為任務(wù),線程執(zhí)行任務(wù),任務(wù)執(zhí)行結(jié)果三部分。下面以表格形式對(duì)這三部分進(jìn)行說明。
項(xiàng) | 說明 |
---|---|
任務(wù) | Executor框架提供了Runnable接口和Callable接口,任務(wù)需要實(shí)現(xiàn)這兩個(gè)接口才能被線程執(zhí)行 |
線程執(zhí)行任務(wù) | Executor框架提供了接口Executor和繼承于Executor的ExecutorService接口來定義任務(wù)執(zhí)行機(jī)制。Executor框架中的線程池類ThreadPoolExecutor和ScheduledThreadPoolExecutor均實(shí)現(xiàn)了ExecutorService接口 |
任務(wù)執(zhí)行結(jié)果 | Executor框架提供了Future接口和實(shí)現(xiàn)了Future接口的FutureTask類來定義任務(wù)執(zhí)行結(jié)果。 |
組件之間的類圖關(guān)系如下所示。
Executor接口是線程池的頂層接口,通常說到的線程池指的是ThreadPoolExecutor,同時(shí)ThreadPoolExecutor還有一個(gè)子類叫做ScheduledThreadPoolExecutor,其擴(kuò)展實(shí)現(xiàn)了延時(shí)執(zhí)行任務(wù)和定時(shí)執(zhí)行任務(wù)的功能。
Executor框架指的是任務(wù),執(zhí)行任務(wù)的線程池和任務(wù)執(zhí)行結(jié)果這三部分,切不可將Executor框架與Executor接口相混淆。
本篇文章就將對(duì)Executor框架中的ThreadPoolExecutor的源碼實(shí)現(xiàn)進(jìn)行學(xué)習(xí)。
二. 認(rèn)識(shí)ThreadPoolExecutor狀態(tài)
在學(xué)習(xí)ThreadPoolExecutor如何執(zhí)行任務(wù)前,先認(rèn)識(shí)一下ThreadPoolExecutor的狀態(tài)。
ThreadPoolExecutor繼承于AbstractExecutorService,并實(shí)現(xiàn)了ExecutorService接口,是Executor框架的核心類,用于管理線程。
ThreadPoolExecutor使用了原子整型ctl來表示線程池狀態(tài)和Worker數(shù)量。ctl是一個(gè)原子整型,前3位表示線程池狀態(tài),后29位表示Worker數(shù)量。ThreadPoolExecutor中這部分的源碼如下所示。
public class ThreadPoolExecutor extends AbstractExecutorService { private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // 取整型前3位,即獲取線程池狀態(tài) private static int runStateOf(int c) { return c & ~CAPACITY; } // 取整型后29位,即獲取Worker數(shù)量 private static int workerCountOf(int c) { return c & CAPACITY; } // 根據(jù)線程池狀態(tài)和Worker數(shù)量拼裝ctl private static int ctlOf(int rs, int wc) { return rs | wc; } // 線程池狀態(tài)判斷 private static boolean runStateLessThan(int c, int s) { return c < s; } // 線程池狀態(tài)判斷 private static boolean runStateAtLeast(int c, int s) { return c >= s; } // 判斷線程池狀態(tài)是否為RUNNING private static boolean isRunning(int c) { return c < SHUTDOWN; } ...... }
可知ThreadPoolExecutor有如下五種線程池狀態(tài)。
- RUNNING,線程池接受新任務(wù),會(huì)執(zhí)行任務(wù)阻塞隊(duì)列中的任務(wù),ctl前三位表示為111;
- SHUTDOWN,線程池拒絕新任務(wù),會(huì)執(zhí)行任務(wù)阻塞隊(duì)列中的任務(wù),ctl前三位表示為000;
- STOP,線程池拒絕新任務(wù),不會(huì)執(zhí)行任務(wù)阻塞隊(duì)列中的任務(wù),嘗試中斷正在執(zhí)行的任務(wù),ctl前三位表示為001;
- TIDYING,所有任務(wù)被關(guān)閉,Worker數(shù)量為0,ctl前三位表示為010;
- TERMINATED,terminated() 執(zhí)行完畢,ctl前三位表示為011。
得益于ctl的結(jié)構(gòu),所以無論Worker數(shù)量是多少,ThreadPoolExecutor中線程池狀態(tài)存在如下關(guān)系。
RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
因此runStateLessThan(),runStateAtLeast() 和isRunning() 方法可以方便的對(duì)線程池狀態(tài)進(jìn)行判斷。
三. 執(zhí)行任務(wù)源碼分析
作為線程池,ThreadPoolExecutor最重要也最經(jīng)典的地方,當(dāng)然就是執(zhí)行任務(wù)了。本節(jié)對(duì)ThreadPoolExecutor執(zhí)行任務(wù)的流程進(jìn)行一個(gè)學(xué)習(xí)。
ThreadPoolExecutor中執(zhí)行任務(wù)的入口方法為execute(),其實(shí)現(xiàn)如下。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 如果Worker數(shù)量小于核心線程數(shù),則創(chuàng)建Worker并執(zhí)行任務(wù) if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 如果Worker數(shù)量大于等于核心線程數(shù),則將任務(wù)添加到任務(wù)阻塞隊(duì)列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果線程池狀態(tài)突然不再是RUNNING,則嘗試將任務(wù)從任務(wù)阻塞隊(duì)列中刪除,刪除成功則為該任務(wù)執(zhí)行拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); // 如果線程池中Worker數(shù)量突然為0,則創(chuàng)建一個(gè)Worker來執(zhí)行任務(wù) else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 執(zhí)行到這里表示線程池狀態(tài)已經(jīng)不是RUNNING或者任務(wù)阻塞隊(duì)列已滿 // 此時(shí)嘗試新建一個(gè)Worker來執(zhí)行任務(wù) // 如果新建一個(gè)Worker來執(zhí)行任務(wù)失敗,表明線程池狀態(tài)不再是RUNNING或者Worker數(shù)量已經(jīng)達(dá)到最大線程數(shù),此時(shí)執(zhí)行拒絕策略 else if (!addWorker(command, false)) reject(command); }
在execute() 中會(huì)根據(jù)Worker數(shù)量和線程池狀態(tài)來決定是新建Worker來執(zhí)行任務(wù)還是將任務(wù)添加到任務(wù)阻塞隊(duì)列。新建Worker來執(zhí)行任務(wù)的實(shí)現(xiàn)如下所示。
private boolean addWorker(Runnable firstTask, boolean core) { // 標(biāo)記外層for循環(huán) retry: for (;;) { int c = ctl.get(); // 獲取線程池狀態(tài) int rs = runStateOf(c); // 線程池狀態(tài)為RUNNING時(shí),可以創(chuàng)建Worker // 線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列不為空時(shí),可以創(chuàng)建初始任務(wù)為null的Worker if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 獲取Worker數(shù)量 int wc = workerCountOf(c); // 如果Worker數(shù)量大于CAPACITY,拒絕創(chuàng)建Worker // core為true表示創(chuàng)建核心線程Worker,如果Worker數(shù)量此時(shí)已經(jīng)大于等于核心線程數(shù),則拒絕創(chuàng)建Worker,轉(zhuǎn)而應(yīng)該將任務(wù)添加到任務(wù)阻塞隊(duì)列 // core為false表示創(chuàng)建非核心線程Worker,如果Worker數(shù)量此時(shí)已經(jīng)大于等于最大線程數(shù),則拒絕創(chuàng)建Worker,轉(zhuǎn)而應(yīng)該執(zhí)行拒絕策略 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 以CAS方式將Worker數(shù)量加1 // 加1成功表明無競爭發(fā)生,從外層for循環(huán)跳出 if (compareAndIncrementWorkerCount(c)) break retry; // 加1失敗表明有競爭發(fā)生,此時(shí)需要重新獲取ctl的值 c = ctl.get(); // 重新獲取ctl后如果發(fā)現(xiàn)線程池狀態(tài)發(fā)生了改變,此時(shí)重新執(zhí)行外層for循環(huán),即需要基于新的線程池狀態(tài)判斷是否允許創(chuàng)建Worker // 重新獲取ctl后如果線程池狀態(tài)未發(fā)生改變,則繼續(xù)執(zhí)行內(nèi)層for循環(huán),即嘗試再一次以CAS方式將Worker數(shù)量加1 if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 創(chuàng)建一個(gè)Worker w = new Worker(firstTask); // 獲取Worker的線程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 由于線程池中存儲(chǔ)Worker的集合為HashSet,因此將Worker添加到Worker集合時(shí)需要獲取全局鎖保證線程安全 mainLock.lock(); try { // 再一次獲取線程池狀態(tài) int rs = runStateOf(ctl.get()); // 如果線程池狀態(tài)還是為RUNNING或者線程池狀態(tài)為SHUTDOWN但創(chuàng)建的Worker的初始任務(wù)為null,則允許將創(chuàng)建出來的Worker添加到集合 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 檢查一下Worker的線程是否可以啟動(dòng)(處于活動(dòng)狀態(tài)的線程無法再啟動(dòng)) if (t.isAlive()) throw new IllegalThreadStateException(); // 將Worker添加到Worker集合 workers.add(w); int s = workers.size(); // largestPoolSize用于記錄線程池最多存在過的Worker數(shù) if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 啟動(dòng)Worker線程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) // Worker線程沒有成功啟動(dòng)起來,此時(shí)需要對(duì)該Worker的創(chuàng)建執(zhí)行回滾操作 addWorkerFailed(w); } return workerStarted; }
addWorker() 方法中只允許兩種情況可以創(chuàng)建Worker。
- 線程池狀態(tài)為RUNNING,可以創(chuàng)建Worker;
- 線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列不為空,可以創(chuàng)建初始任務(wù)為null的Worker。
一旦Worker創(chuàng)建成功,就會(huì)將Worker的線程啟動(dòng),如果Worker創(chuàng)建失敗或者Worker的線程啟動(dòng)失敗,則會(huì)調(diào)用addWorkerFailed() 方法執(zhí)行回滾操作,其實(shí)現(xiàn)如下所示。
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 如果Worker添加到了Worker集合中,則將Worker從Worker集合中刪除 if (w != null) workers.remove(w); // 以CAS方式將Worker數(shù)量減1 decrementWorkerCount(); // 嘗試終止線程池 tryTerminate(); } finally { mainLock.unlock(); } }
由于Worker自身實(shí)現(xiàn)了Runnable,因此Worker自身就是一個(gè)任務(wù),實(shí)際上Worker的線程執(zhí)行的任務(wù)就是Worker本身,因此addWorker() 中將Worker的線程啟動(dòng)時(shí),會(huì)調(diào)用Worker的run() 方法,其實(shí)現(xiàn)如下。
public void run() { runWorker(this); }
在Worker的run() 方法中調(diào)用了ThreadPoolExecutor的runWorker() 方法,其實(shí)現(xiàn)如下所示。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { // 如果task為null,則從任務(wù)阻塞隊(duì)列中獲取任務(wù) // 通常Worker啟動(dòng)時(shí)會(huì)先執(zhí)行初始任務(wù),然后再去任務(wù)阻塞隊(duì)列中獲取任務(wù) while (task != null || (task = getTask()) != null) { w.lock(); // 線程池正在停止時(shí),需要確保當(dāng)前Worker的線程是被中斷的 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // Worker執(zhí)行任務(wù)發(fā)生異常或者從getTask()中獲取任務(wù)為空時(shí)會(huì)執(zhí)行這里的邏輯 // processWorkerExit()會(huì)將Worker從Worker集合中刪除,并嘗試終止線程池 processWorkerExit(w, completedAbruptly); } }
runWorker() 方法就是先讓Worker將初始任務(wù)(如果有的話)執(zhí)行完,然后循環(huán)從任務(wù)阻塞隊(duì)列中獲取任務(wù)來執(zhí)行,如果Worker執(zhí)行任務(wù)發(fā)生異?;蛘邚娜蝿?wù)阻塞隊(duì)列獲取任務(wù)失?。ǐ@取到的任務(wù)為null),則調(diào)用processWorkerExit() 方法來將自身從Worker集合中刪除。下面先看一下getTask() 方法的實(shí)現(xiàn)。
private Runnable getTask() { boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列為空,則不再允許從任務(wù)阻塞隊(duì)列中獲取任務(wù) // 如果線程池狀態(tài)為STOP,則不再允許從任務(wù)阻塞隊(duì)列中獲取任務(wù) if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 如果allowCoreThreadTimeOut為true,或者當(dāng)前線程數(shù)大于核心線程數(shù),此時(shí)timed為true,表明從任務(wù)阻塞隊(duì)列以超時(shí)退出的方式獲取任務(wù) boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 如果當(dāng)前線程數(shù)大于最大線程數(shù),則當(dāng)前Worker應(yīng)該被刪除 // 如果當(dāng)前Worker上一次從任務(wù)阻塞隊(duì)列中獲取任務(wù)時(shí)超時(shí)退出,且任務(wù)阻塞隊(duì)列現(xiàn)在還是為空,則當(dāng)前Worker應(yīng)該被刪除 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 從任務(wù)阻塞隊(duì)列中獲取任務(wù) Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) // 獲取到任務(wù)則返回該任務(wù) return r; // timedOut為true表明Worker上一次從任務(wù)阻塞隊(duì)列中獲取任務(wù)時(shí)超時(shí)退出 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
getTask() 方法在如下情況不允許Worker從任務(wù)阻塞隊(duì)列中獲取任務(wù)。
- 線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列為空;
- 線程池狀態(tài)為STOP。
如果Worker有資格從任務(wù)阻塞隊(duì)列獲取任務(wù),那么當(dāng)allowCoreThreadTimeOut為true,或者當(dāng)前線程數(shù)大于核心線程數(shù)時(shí),Worker以超時(shí)退出的方式獲取任務(wù),否則Worker以一直阻塞的方式獲取任務(wù)。
當(dāng)Worker在getTask() 方法中獲取任務(wù)失敗時(shí),getTask() 方法會(huì)返回null,從而導(dǎo)致Worker會(huì)執(zhí)行processWorkerExit() 方法來刪除自身,其實(shí)現(xiàn)如下所示。
private void processWorkerExit(Worker w, boolean completedAbruptly) { // completedAbruptly為true表明是執(zhí)行任務(wù)時(shí)發(fā)生異常導(dǎo)致Worker需要被刪除 if (completedAbruptly) // 修正Worker數(shù)量 decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 將Worker從Worker集合中刪除 workers.remove(w); } finally { mainLock.unlock(); } // 嘗試終止線程池 tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; } addWorker(null, false); } }
Worker在processWorkerExit() 方法中刪除自身之后,還會(huì)調(diào)用tryTerminate() 嘗試終止線程池,tryTerminate() 方法很精髓,后面會(huì)對(duì)其進(jìn)行詳細(xì)分析,這里暫且不談。至此,Worker的創(chuàng)建,執(zhí)行任務(wù),獲取任務(wù)和刪除的整個(gè)流程已經(jīng)大體分析完畢。
對(duì)于執(zhí)行任務(wù),現(xiàn)在簡單進(jìn)行一個(gè)小結(jié)。
ThreadPoolExecutor執(zhí)行任務(wù),第一步是根據(jù)Worker數(shù)量來決定是新建Worker來執(zhí)行任務(wù)還是將任務(wù)添加到任務(wù)阻塞隊(duì)列,這里的判斷規(guī)則如下。
- 如果Worker數(shù)量小于核心線程數(shù),則創(chuàng)建Worker來執(zhí)行任務(wù);
- 如果Worker數(shù)量大于等于核心線程數(shù),則將任務(wù)添加到任務(wù)阻塞隊(duì)列;
- 如果任務(wù)阻塞隊(duì)列已滿,則創(chuàng)建Worker來執(zhí)行任務(wù);
- 如果Worker數(shù)量已經(jīng)達(dá)到最大線程數(shù),此時(shí)執(zhí)行任務(wù)拒絕策略。
當(dāng)要新建Worker來執(zhí)行任務(wù)時(shí),只有兩種情況可以新建Worker,如下所示。
- 線程池狀態(tài)為RUNNING,可以創(chuàng)建Worker;
- 線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列不為空,可以創(chuàng)建初始任務(wù)為null的Worker。
Worker自身實(shí)現(xiàn)了Runnable,且Worker持有一個(gè)線程,當(dāng)Worker啟動(dòng)時(shí),就是啟動(dòng)Worker持有的線程,而這個(gè)線程執(zhí)行的任務(wù)就是Worker自身。
Worker啟動(dòng)后,會(huì)首先執(zhí)行自己的初始任務(wù),然后再去任務(wù)阻塞隊(duì)列中獲取任務(wù)。
四. 關(guān)閉線程池源碼分析
不再使用的線程池,可以進(jìn)行關(guān)閉。關(guān)閉ThreadPoolExecutor的方法有shutdown() 和shutdownNow(),本節(jié)將對(duì)ThreadPoolExecutor的關(guān)閉進(jìn)行分析。
1. shutdown()
首先分析shutdown() 方法,其實(shí)現(xiàn)如下。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 循環(huán)通過CAS方式將線程池狀態(tài)置為SHUTDOWN advanceRunState(SHUTDOWN); // 中斷空閑Worker interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } // 嘗試終止線程池 tryTerminate(); }
在shutdown() 方法中首先會(huì)將線程池狀態(tài)置為SHUTDOWN,然后調(diào)用interruptIdleWorkers() 方法中斷空閑Worker,最后調(diào)用tryTerminate() 方法來嘗試終止線程池。那么這里要解釋一下什么是空閑Worker,先看一下interruptIdleWorkers() 的實(shí)現(xiàn)。
private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // 中斷線程前需要先嘗試獲取Worker的鎖 // 只能獲取到空閑Worker的鎖,所以shutdown()方法只會(huì)中斷空閑Worker if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
調(diào)用interruptIdleWorkers() 方法中斷Worker前首先需要嘗試獲取Worker的鎖,已知Worker除了實(shí)現(xiàn)Runnable接口外,還繼承于AbstractQueuedSynchronizer,因此Worker本身是一把鎖,然后在runWorker() 中Worker執(zhí)行任務(wù)前都會(huì)先獲取Worker的鎖,這里看一下Worker的lock() 方法的實(shí)現(xiàn)。
public void lock() { acquire(1); } protected boolean tryAcquire(int unused) { // 以CAS方式將state從0設(shè)置為1 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
可以發(fā)現(xiàn),Worker在lock() 中調(diào)用了acquire() 方法,該方法由AbstractQueuedSynchronizer抽象類提供,在acquire() 中會(huì)調(diào)用其子類實(shí)現(xiàn)的tryAcquire() 方法,tryAcquire() 方法會(huì)以CAS方式將state從0設(shè)置為1,因此這樣的設(shè)計(jì)讓Worker是一把不可重入鎖。
回到interruptIdleWorkers() 方法,前面提到該方法中斷Worker前會(huì)嘗試獲取Worker的鎖,能夠獲取到鎖才會(huì)中斷Worker,而因?yàn)?strong>Worker是不可重入鎖,所以正在執(zhí)行任務(wù)的Worker是無法獲取到鎖的,只有那些沒有執(zhí)行任務(wù)的Worker的鎖才能夠被獲取,因此所謂的中斷空閑Worker,實(shí)際就是中斷沒有執(zhí)行任務(wù)的Worker,那些執(zhí)行任務(wù)的Worker在shutdown() 方法被調(diào)用時(shí)不會(huì)被中斷,這些Worker執(zhí)行完任務(wù)后會(huì)繼續(xù)從任務(wù)阻塞隊(duì)列中獲取任務(wù)來執(zhí)行,直到任務(wù)阻塞隊(duì)列為空,此時(shí)沒有被中斷過的Worker也會(huì)被刪除掉,等到線程池中沒有Worker以及任務(wù)阻塞隊(duì)列沒有任務(wù)后,線程池才會(huì)被終止掉。
對(duì)于shutdown() 方法,一句話總結(jié)就是:將線程池狀態(tài)置為SHUTDOWN并拒絕接受新任務(wù),等到線程池Worker數(shù)量為0,任務(wù)阻塞隊(duì)列為空時(shí),關(guān)閉線程池。
2. shutdownNow()
現(xiàn)在再來分析shutdownNow() 方法。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 循環(huán)通過CAS方式將線程池狀態(tài)置為STOP advanceRunState(STOP); // 中斷所有Worker interruptWorkers(); // 將任務(wù)阻塞隊(duì)列中的任務(wù)獲取出來并返回 tasks = drainQueue(); } finally { mainLock.unlock(); } // 嘗試終止線程池 tryTerminate(); return tasks; } private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 中斷線程池中所有Worker for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }
在shutdownNow() 方法中首先會(huì)將線程池狀態(tài)置為STOP,然后調(diào)用interruptWorkers() 方法中斷線程池中的所有Worker,接著調(diào)用tryTerminate() 方法來嘗試終止線程池,最后shutdownNow() 方法會(huì)將任務(wù)阻塞隊(duì)列中還未被執(zhí)行的任務(wù)返回。
shutdownNow() 方法調(diào)用之后,線程池中的所有Worker都會(huì)被中斷,包括正在執(zhí)行任務(wù)的Worker,等到所有Worker都被刪除之后,線程池即被終止,也就是說,shutdownNow() 不會(huì)保證當(dāng)前時(shí)刻正在執(zhí)行的任務(wù)會(huì)被安全的執(zhí)行完,并且會(huì)放棄執(zhí)行任務(wù)阻塞隊(duì)列中的所有任務(wù)。
3. tryTerminate()
關(guān)于線程池的關(guān)閉,還有一個(gè)重要的方法,那就是前面多次提到的tryTerminate() 方法,該方法能確保線程池可以被正確的關(guān)閉,其實(shí)現(xiàn)如下所示。
final void tryTerminate() { for (;;) { int c = ctl.get(); // 如果線程池狀態(tài)為RUNNING,則沒有資格終止線程池 // 如果線程池狀態(tài)大于等于TIDYING,則沒有資格終止線程池 // 如果線程池狀態(tài)為SHUTDOWN但任務(wù)阻塞隊(duì)列不為空,則沒有資格終止線程池 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 線程池狀態(tài)為SHUTDOWN且任務(wù)阻塞隊(duì)列為空會(huì)執(zhí)行到這里 // 線程池狀態(tài)為STOP會(huì)執(zhí)行到這里 // Worker數(shù)量不為0,表明當(dāng)前還有正在執(zhí)行任務(wù)的Worker或者空閑的Worker,此時(shí)中斷一個(gè)空閑的Worker // 在這里被中斷的空閑Worker會(huì)在getTask()方法中返回null,從而執(zhí)行processWorkerExit(),最終該Worker會(huì)被刪除 // processWorkerExit()方法中又會(huì)調(diào)用tryTerminate(),因此將shutdown信號(hào)在空閑Worker之間進(jìn)行了傳播 if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 將線程池狀態(tài)置為TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 終止線程池 terminated(); } finally { // 將線程池狀態(tài)最終置為TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } } }
在tryTerminate() 方法的官方注釋中給出了兩種線程池會(huì)被終止的情況:
- 線程池的狀態(tài)為SHUTDOWN,Worker數(shù)量為0,任務(wù)阻塞隊(duì)列為空;
- 線程池的狀態(tài)為STOP,Worker數(shù)量為0。
官方注釋中還說明在所有可能導(dǎo)致線程池終止的操作中都應(yīng)該調(diào)用tryTerminate() 方法來嘗試終止線程池,因此線程池中Worker被刪除時(shí)和任務(wù)阻塞隊(duì)列中任務(wù)被刪除時(shí)會(huì)調(diào)用tryTerminate(),以達(dá)到在線程池符合終止條件時(shí)及時(shí)終止線程池。
4. 小結(jié)
對(duì)于關(guān)閉線程池,簡單小結(jié)如下。
關(guān)閉ThreadPoolExecutor有兩種方式,如下所示。
- shutdown()。調(diào)用shutdown() 方法會(huì)首先將線程池狀態(tài)置為SHUTDOWN并拒絕接受新任務(wù),然后中斷空閑Worker,等到線程池中Worker數(shù)量為0,任務(wù)阻塞隊(duì)列為空時(shí),線程池被真正關(guān)閉;
- shutdownNow()。調(diào)用shutdownNow() 方法會(huì)首先將線程池狀態(tài)置為STOP,然后中斷所有Worker(包括正在執(zhí)行任務(wù)的Worker),并將任務(wù)阻塞隊(duì)列中還未被執(zhí)行的任務(wù)返回,當(dāng)線程池Worker數(shù)量為0時(shí),線程池被真正關(guān)閉。
還有一點(diǎn)需要說明,Worker除了實(shí)現(xiàn)Runnable接口外,還繼承于AbstractQueuedSynchronizer,因此Worker本身是一把鎖,Worker執(zhí)行任務(wù)前都會(huì)先獲取Worker的鎖,所以正在執(zhí)行任務(wù)的Worker的鎖是無法被獲取的,換言之,只有沒有執(zhí)行任務(wù)的Worker的鎖才能被獲取,這些Worker就稱為空閑Worker。
以上就是一文搞懂Java的ThreadPoolExecutor原理的詳細(xì)內(nèi)容,更多關(guān)于Java ThreadPoolExecutor的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Java線程池?ThreadPoolExecutor?詳解
- Java多線程ThreadPoolExecutor詳解
- Java線程池ThreadPoolExecutor源碼深入分析
- java高并發(fā)ThreadPoolExecutor類解析線程池執(zhí)行流程
- java高并發(fā)ScheduledThreadPoolExecutor與Timer區(qū)別
- 徹底搞懂java并發(fā)ThreadPoolExecutor使用
- Java多線程編程基石ThreadPoolExecutor示例詳解
- 源碼分析Java中ThreadPoolExecutor的底層原理
- 一文弄懂Java中ThreadPoolExecutor
相關(guān)文章
spring boot配置讀寫分離的完整實(shí)現(xiàn)步驟
數(shù)據(jù)庫配置主從之后,如何在代碼層面實(shí)現(xiàn)讀寫分離?所以下面這篇文章主要給大家介紹了關(guān)于spring boot配置讀寫分離的完整步驟,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2018-09-09Java結(jié)構(gòu)型設(shè)計(jì)模式之橋接模式詳細(xì)講解
橋接,顧名思義,就是用來連接兩個(gè)部分,使得兩個(gè)部分可以互相通訊。橋接模式將系統(tǒng)的抽象部分與實(shí)現(xiàn)部分分離解耦,使他們可以獨(dú)立的變化。本文通過示例詳細(xì)介紹了橋接模式的原理與使用,需要的可以參考一下2022-09-09使用postman傳遞list集合后臺(tái)springmvc接收
這篇文章主要介紹了使用postman傳遞list集合后臺(tái)springmvc接收的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08Maven中怎么手動(dòng)添加jar包到本地倉庫詳解(repository)
這篇文章主要給大家介紹了關(guān)于Maven中怎么手動(dòng)添加jar包到本地倉庫的相關(guān)資料,文中通過圖文以及實(shí)例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2023-04-04解析SpringBoot中使用LoadTimeWeaving技術(shù)實(shí)現(xiàn)AOP功能
這篇文章主要介紹了SpringBoot中使用LoadTimeWeaving技術(shù)實(shí)現(xiàn)AOP功能,AOP面向切面編程,通過為目標(biāo)類織入切面的方式,實(shí)現(xiàn)對(duì)目標(biāo)類功能的增強(qiáng),本文給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-09-09