Java 線程池ThreadPoolExecutor源碼解析
引導(dǎo)語(yǔ)
線程池我們?cè)诠ぷ髦薪?jīng)常會(huì)用到。在請(qǐng)求量大時(shí),使用線程池,可以充分利用機(jī)器資源,增加請(qǐng)求的處理速度,本章節(jié)我們就和大家一起來(lái)學(xué)習(xí)線程池。
本章的順序,先說(shuō)源碼,弄懂原理,接著看一看面試題,最后看看實(shí)際工作中是如何運(yùn)用線程池的。
1、整體架構(gòu)圖
我們畫了線程池的整體圖,如下:
本小節(jié)主要就按照這個(gè)圖來(lái)進(jìn)行 ThreadPoolExecutor 源碼的講解,大家在看各個(gè)方法時(shí),可以結(jié)合這個(gè)圖一起看。
1.1、類結(jié)構(gòu)
首先我們來(lái)看一下 ThreadPoolExecutor 的類結(jié)構(gòu),如下圖:
從上圖中,我們從命名上來(lái)看,都有 Executor 的共同命名,Executor 的中文意思為執(zhí)行的意思,表示對(duì)提供的任務(wù)進(jìn)行執(zhí)行,我們?cè)诘谖逭戮€程中學(xué)習(xí)到了幾種任務(wù):Runnable、Callable、FutureTask,之前我們都是使用 Thread 來(lái)執(zhí)行這些任務(wù)的,除了 Thread,這些 Executor 命名的類和接口也是可以執(zhí)行這幾種任務(wù)的,接下來(lái)我們大概的看下這幾個(gè)類的大概含義:
Executor:定義 execute 方法來(lái)執(zhí)行任務(wù),入?yún)⑹?Runnable,無(wú)出參:
ExecutorService:Executor 的功能太弱,ExecutorService 豐富了對(duì)任務(wù)的執(zhí)行和管理的功能,主要代碼如下:
// 關(guān)閉,不會(huì)接受新的任務(wù),也不會(huì)等待未完成的任務(wù) // 如果需要等待未完成的任務(wù),可以使用 awaitTermination 方法 void shutdown(); // executor 是否已經(jīng)關(guān)閉了,返回值 true 表示已關(guān)閉 boolean isShutdown(); // 所有的任務(wù)是否都已經(jīng)終止,是的話,返回 true boolean isTerminated(); // 在超時(shí)時(shí)間內(nèi),等待剩余的任務(wù)終止 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 提交有返回值的任務(wù),使用 get 方法可以阻塞等待任務(wù)的執(zhí)行結(jié)果返回 <T> Future<T> submit(Callable<T> task); // 提交沒(méi)有返回值的任務(wù),如果使用 get 方法的話,任務(wù)執(zhí)行完之后得到的是 null 值 Future<?> submit(Runnable task); // 給定任務(wù)集合,返回已經(jīng)執(zhí)行完成的 Future 集合,每個(gè)返回的 Future 都是 isDone = true 的狀態(tài) <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; // 給定任務(wù)中有一個(gè)執(zhí)行成功就返回,如果拋異常,其余未完成的任務(wù)將被取消 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
AbstractExecutorService 是一個(gè)抽象類,封裝了 Executor 的很多通用功能,比如:
// 把 Runnable 轉(zhuǎn)化成 RunnableFuture // RunnableFuture 是一個(gè)接口,實(shí)現(xiàn)了 Runnable 和 Future // FutureTask 是 RunnableFuture 的實(shí)現(xiàn)類,主要是對(duì)任務(wù)進(jìn)行各種管理 // Runnable + Future => RunnableFuture => FutureTask protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } // 提交無(wú)返回值的任務(wù) public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); // ftask 其實(shí)是 FutureTask RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } // 提交有返回值的任務(wù) public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // ftask 其實(shí)是 FutureTask RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
有幾個(gè)點(diǎn)需要注意下:
FutureTask 我們?cè)诘谖逭掠姓f(shuō),其本身就是一個(gè)任務(wù),而且具備對(duì)任務(wù)管理的功能,比如可以通過(guò) get 方法拿到任務(wù)的執(zhí)行結(jié)果;
submit 方法是我們平時(shí)使用線程池時(shí)提交任務(wù)的方法,支持 Runable 和 Callable 兩種任務(wù)的提交,方法中 execute 方法是其子類 ThreadPoolExecutor 實(shí)現(xiàn)的,不管是那種任務(wù)入?yún)?,execute 方法最終執(zhí)行的任務(wù)都是 FutureTask;
ThreadPoolExecutor 繼承了 AbstractExecutorService 抽象類,具備以上三個(gè)類的所有功能。
1.2、類注釋
ThreadPoolExecutor 的類注釋有很多,我們選取關(guān)鍵的注釋如下:
- ExecutorService 使用線程池中的線程執(zhí)行提交的任務(wù),線程池我們可以使用 Executors 進(jìn)行配置;
- 線程池解決兩個(gè)問(wèn)題:1:通過(guò)減少任務(wù)間的調(diào)度開銷 (主要是通過(guò)線程池中的線程被重復(fù)使用的方式),來(lái)提高大量任務(wù)時(shí)的執(zhí)行性能;2:提供了一種方式來(lái)管理線程和消費(fèi),維護(hù)基本數(shù)據(jù)統(tǒng)計(jì)等工作,比如統(tǒng)計(jì)已完成的任務(wù)數(shù);
- Executors 為常用的場(chǎng)景設(shè)定了可直接初始化線程池的方法,比如 Executors#newCachedThreadPool 無(wú)界的線程池,并且可以自動(dòng)回收;Executors#newFixedThreadPool 固定大小線程池;Executors#newSingleThreadExecutor 單個(gè)線程的線程池;
- 為了在各種上下文中使用線程池,線程池提供可供擴(kuò)展的參數(shù)設(shè)置:1:coreSize:當(dāng)新任務(wù)提交時(shí),發(fā)現(xiàn)運(yùn)行的線程數(shù)小于 coreSize,一個(gè)新的線程將被創(chuàng)建,即使這時(shí)候其它工作線程是空閑的,可以通過(guò) getCorePoolSize 方法獲得 coreSize;2:maxSize: 當(dāng)任務(wù)提交時(shí),coreSize < 運(yùn)行線程數(shù) <= maxSize,但隊(duì)列沒(méi)有滿時(shí),任務(wù)提交到隊(duì)列中,如果隊(duì)列滿了,在 maxSize 允許的范圍內(nèi)新建線程;
- 一般來(lái)說(shuō),coreSize 和 maxSize 在線程池初始化時(shí)就已經(jīng)設(shè)定了,但我們也可以通過(guò) setCorePoolSize、setMaximumPoolSize 方法動(dòng)態(tài)的修改這兩個(gè)值;
- 默認(rèn)的,core threads 需要到任務(wù)提交后才創(chuàng)建的,但我們可以分別使用 prestartCoreThread、prestartAllCoreThreads 兩個(gè)方法來(lái)提前創(chuàng)建一個(gè)、所有的 core threads;
- 新的線程被默認(rèn) ThreadFactory 創(chuàng)建時(shí),優(yōu)先級(jí)會(huì)被限制成 NORM_PRIORITY,默認(rèn)會(huì)被設(shè)置成非守護(hù)線程,這個(gè)和新建線程的繼承是不同的;
- Keep-alive times 參數(shù)的作用:1:如果當(dāng)前線程池中有超過(guò) coreSize 的線程;2:并且線程空閑的時(shí)間超過(guò) keepAliveTime,當(dāng)前線程就會(huì)被回收,這樣可以避免線程沒(méi)有被使用時(shí)的資源浪費(fèi);
- 通過(guò) setKeepAliveTime 方法可以動(dòng)態(tài)的設(shè)置 keepAliveTime 的值;
- 如果設(shè)置 allowCoreThreadTimeOut 為 ture 的話,core thread 空閑時(shí)間超過(guò) keepAliveTime 的話,也會(huì)被回收;
- 線程池新建時(shí),有多種隊(duì)列可供選擇,比如:1:SynchronousQueue,為了避免任務(wù)被拒絕,要求線程池的 maxSize 無(wú)界,缺點(diǎn)是當(dāng)任務(wù)提交的速度超過(guò)消費(fèi)的速度時(shí),可能出現(xiàn)無(wú)限制的線程增長(zhǎng);2:LinkedBlockingQueue,無(wú)界隊(duì)列,未消費(fèi)的任務(wù)可以在隊(duì)列中等待;3:ArrayBlockingQueue,有界隊(duì)列,可以防止資源被耗盡;
- 隊(duì)列的維護(hù):提供了 getQueue () 方法方便我們進(jìn)行監(jiān)控和調(diào)試,嚴(yán)禁用于其他目的,remove 和 purge 兩個(gè)方法可以對(duì)隊(duì)列中的元素進(jìn)行操作;
- 在 Executor 已經(jīng)關(guān)閉或?qū)ψ畲缶€程和最大隊(duì)列都使用飽和時(shí),可以使用 RejectedExecutionHandler 類進(jìn)行異常捕捉,有如下四種處理策略:ThreadPoolExecutor.AbortPolicy、ThreadPoolExecutor.DiscardPolicy、ThreadPoolExecutor.CallerRunsPolicy、ThreadPoolExecutor.DiscardOldestPolicy;
- 線程池提供了很多可供擴(kuò)展的鉤子函數(shù),比如有:1:提供在每個(gè)任務(wù)執(zhí)行之前 beforeExecute 和執(zhí)行之后 afterExecute 的鉤子方法,主要用于操作執(zhí)行環(huán)境,比如初始化 ThreadLocals、收集統(tǒng)計(jì)數(shù)據(jù)、添加日志條目等;2: 如果在執(zhí)行器執(zhí)行完成之后想干一些事情,可以實(shí)現(xiàn) terminated 方法,如果鉤子方法執(zhí)行時(shí)發(fā)生異常,工作線程可能會(huì)失敗并立即終止。
可以看到 ThreadPoolExecutor 的注釋是非常多的,也是非常重要的,我們很多面試的題目,在注釋上都能找到答案。
1.3、ThreadPoolExecutor 重要屬性
接下來(lái)我們來(lái)看一看 ThreadPoolExecutor 都有哪些重要屬性,如下:
//ctl 線程池狀態(tài)控制字段,由兩部分組成: //1:workerCount wc 工作線程數(shù),我們限制 workerCount 最大到(2^29)-1,大概 5 億個(gè)線程 //2:runState rs 線程池的狀態(tài),提供了生命周期的控制,源碼中有很多關(guān)于狀態(tài)的校驗(yàn),狀態(tài)枚舉如下: //RUNNING(-536870912):接受新任務(wù)或者處理隊(duì)列里的任務(wù)。 //SHUTDOWN(0):不接受新任務(wù),但仍在處理已經(jīng)在隊(duì)列里面的任務(wù)。 //STOP(-536870912):不接受新任務(wù),也不處理隊(duì)列中的任務(wù),對(duì)正在執(zhí)行的任務(wù)進(jìn)行中斷。 //TIDYING(1073741824): 所以任務(wù)都被中斷,workerCount 是 0,整理狀態(tài) //TERMINATED(1610612736): terminated() 已經(jīng)完成的時(shí)候 //runState 之間的轉(zhuǎn)變過(guò)程: //RUNNING -> SHUTDOWN:調(diào)用 shudown(),finalize() //(RUNNING or SHUTDOWN) -> STOP:調(diào)用shutdownNow() //SHUTDOWN -> TIDYING -> workerCount ==0 //STOP -> TIDYING -> workerCount ==0 //TIDYING -> TERMINATED -> terminated() 執(zhí)行完成之后 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3;// 29 private static final int CAPACITY = (1 << COUNT_BITS) - 1;// =(2^29)-1=536870911 // Packing and unpacking ctl private static int ctlOf(int rs, int wc) { return rs | wc; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int runStateOf(int c) { return c & ~CAPACITY; } // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS;//-536870912 private static final int SHUTDOWN = 0 << COUNT_BITS;//0 private static final int STOP = 1 << COUNT_BITS;//-536870912 private static final int TIDYING = 2 << COUNT_BITS;//1073741824 private static final int TERMINATED = 3 << COUNT_BITS;//1610612736 // 已完成任務(wù)的計(jì)數(shù) volatile long completedTasks; // 線程池最大容量 private int largestPoolSize; // 已經(jīng)完成的任務(wù)數(shù) private long completedTaskCount; // 用戶可控制的參數(shù)都是 volatile 修飾的 // 可以使用 threadFactory 創(chuàng)建 thread // 創(chuàng)建失敗一般不拋出異常,只有在 OutOfMemoryError 時(shí)候才會(huì) private volatile ThreadFactory threadFactory; // 飽和或者運(yùn)行中拒絕任務(wù)的 handler 處理類 private volatile RejectedExecutionHandler handler; // 線程存活時(shí)間設(shè)置 private volatile long keepAliveTime; // 設(shè)置 true 的話,核心線程空閑 keepAliveTime 時(shí)間后,也會(huì)被回收 private volatile boolean allowCoreThreadTimeOut; // coreSize private volatile int corePoolSize; // maxSize 最大限制 (2^29)-1 private volatile int maximumPoolSize; // 默認(rèn)的拒絕策略 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); // 隊(duì)列會(huì) hold 住任務(wù),并且利用隊(duì)列的阻塞的特性,來(lái)保持線程的存活周期 private final BlockingQueue<Runnable> workQueue; // 大多數(shù)情況下是控制對(duì) workers 的訪問(wèn)權(quán)限 private final ReentrantLock mainLock = new ReentrantLock(); private final Condition termination = mainLock.newCondition(); // 包含線程池中所有的工作線程 private final HashSet<Worker> workers = new HashSet<Worker>();
屬性也是非常多,為了方便理解線程池的狀態(tài)扭轉(zhuǎn),畫了一個(gè)圖:
Worker 我們可以理解成線程池中任務(wù)運(yùn)行的最小單元,Worker 的大致結(jié)構(gòu)如下:
// 線程池中任務(wù)執(zhí)行的最小單元 // Worker 繼承 AQS,具有鎖功能 // Worker 實(shí)現(xiàn) Runnable,本身是一個(gè)可執(zhí)行的任務(wù) private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 任務(wù)運(yùn)行的線程 final Thread thread; // 需要執(zhí)行的任務(wù) Runnable firstTask; // 非常巧妙的設(shè)計(jì),Worker本身是個(gè) Runnable,把自己作為任務(wù)傳遞給 thread // 內(nèi)部有個(gè)屬性又設(shè)置了 Runnable Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 把 Worker 自己作為 thread 運(yùn)行的任務(wù) this.thread = getThreadFactory().newThread(this); } /** Worker 本身是 Runnable,run 方法是 Worker 執(zhí)行的入口, runWorker 是外部的方法 */ public void run() { runWorker(this); } private static final long serialVersionUID = 6138294804551838833L; // Lock methods // 0 代表沒(méi)有鎖住,1 代表鎖住 protected boolean isHeldExclusively() { return getState() != 0; } // 嘗試加鎖,CAS 賦值為 1,表示鎖住 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 嘗試釋放鎖,釋放鎖沒(méi)有 CAS 校驗(yàn),可以任意的釋放鎖 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
理解 Worker 非常關(guān)鍵,主要有以下幾點(diǎn):
Worker 很像是任務(wù)的代理,在線程池中,最小的執(zhí)行單位就是 Worker,所以 Worker 實(shí)現(xiàn)了 Runnable 接口,實(shí)現(xiàn)了 run 方法;
在 Worker 初始化時(shí) this.thread = getThreadFactory ().newThread (this) 這行代碼比較關(guān)鍵,它把當(dāng)前 Worker 作為線程的構(gòu)造器入?yún)ⅲ覀冊(cè)诤罄m(xù)的實(shí)現(xiàn)中會(huì)發(fā)現(xiàn)這樣的代碼:Thread t = w.thread;t.start (),此時(shí)的 w 是 Worker 的引用申明,此處 t.start 實(shí)際上執(zhí)行的就是 Worker 的 run 方法;
Worker 本身也實(shí)現(xiàn)了 AQS,所以其本身也是一個(gè)鎖,其在執(zhí)行任務(wù)的時(shí)候,會(huì)鎖住自己,任務(wù)執(zhí)行完成之后,會(huì)釋放自己。
2、線程池的任務(wù)提交
線程池的任務(wù)提交從 submit 方法說(shuō)起,submit 方法是 AbstractExecutorService 抽象類定義的,主要做了兩件事情:
- 把 Runnable 和 Callable 都轉(zhuǎn)化成 FutureTask,這個(gè)我們之前看過(guò)源碼了;
- 使用 execute 方法執(zhí)行 FutureTask。
execute 方法是 ThreadPoolExecutor 中的方法,源碼如下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 工作的線程小于核心線程數(shù),創(chuàng)建新的線程,成功返回,失敗不拋異常 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; // 線程池狀態(tài)可能發(fā)生變化 c = ctl.get(); } // 工作的線程大于等于核心線程數(shù),或者新建線程失敗 // 線程池狀態(tài)正常,并且可以入隊(duì)的話,嘗試入隊(duì)列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果線程池狀態(tài)異常 嘗試從隊(duì)列中移除任務(wù),可以移除的話就拒絕掉任務(wù) if (!isRunning(recheck) && remove(command)) reject(command); // 發(fā)現(xiàn)可運(yùn)行的線程數(shù)是 0,就初始化一個(gè)線程,這里是個(gè)極限情況,入隊(duì)的時(shí)候,突然發(fā)現(xiàn) // 可用線程都被回收了 else if (workerCountOf(recheck) == 0) // Runnable是空的,不會(huì)影響新增線程,但是線程在 start 的時(shí)候不會(huì)運(yùn)行 // Thread.run() 里面有判斷 addWorker(null, false); } // 隊(duì)列滿了,開啟線程到 maxSize,如果失敗直接拒絕, else if (!addWorker(command, false)) reject(command); }
execute 方法執(zhí)行的就是整體架構(gòu)圖的左半邊的邏輯,其中多次調(diào)用 addWorker 方法,addWorker 方法的作用是新建一個(gè) Worker,我們一起來(lái)看下源碼:
// 結(jié)合線程池的情況看是否可以添加新的 worker // firstTask 不為空可以直接執(zhí)行,為空?qǐng)?zhí)行不了,Thread.run()方法有判斷,Runnable為空不執(zhí)行 // core 為 true 表示線程最大新增個(gè)數(shù)是 coresize,false 表示最大新增個(gè)數(shù)是 maxsize // 返回 true 代表成功,false 失敗 // break retry 跳到retry處,且不再進(jìn)入循環(huán) // continue retry 跳到retry處,且再次進(jìn)入循環(huán) private boolean addWorker(Runnable firstTask, boolean core) { retry: // 先是各種狀態(tài)的校驗(yàn) for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // rs >= SHUTDOWN 說(shuō)明線程池狀態(tài)不正常 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 工作中的線程數(shù)大于等于容量,或者大于等于 coreSize or maxSize if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) // break 結(jié)束 retry 的 for 循環(huán) break retry; c = ctl.get(); // Re-read ctl // 線程池狀態(tài)被更改 if (runStateOf(c) != rs) // 跳轉(zhuǎn)到retry位置 continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 巧妙的設(shè)計(jì),Worker 本身是個(gè) Runnable. // 在初始化的過(guò)程中,會(huì)把 worker 丟給 thread 去初始化 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 啟動(dòng)線程,實(shí)際上去執(zhí)行 Worker.run 方法 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
addWorker 方法首先是執(zhí)行了一堆校驗(yàn),然后使用 new Worker (firstTask) 新建了 Worker,最后使用 t.start () 執(zhí)行 Worker,上文我們說(shuō)了 Worker 在初始化時(shí)的關(guān)鍵代碼:this.thread = getThreadFactory ().newThread (this),Worker(this) 是作為新建線程的構(gòu)造器入?yún)⒌?,所?t.start () 會(huì)執(zhí)行到 Worker 的 run 方法上,源碼如下:
public void run() { runWorker(this); }
runWorker 方法是非常重要的方法,我們一起看下源碼實(shí)現(xiàn):
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; //幫助gc回收 w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // task 為空的情況: // 1:任務(wù)入隊(duì)列了,極限情況下,發(fā)現(xiàn)沒(méi)有運(yùn)行的線程,于是新增一個(gè)線程; // 2:線程執(zhí)行完任務(wù)執(zhí)行,再次回到 while 循環(huán)。 // 如果 task 為空,會(huì)使用 getTask 方法阻塞從隊(duì)列中拿數(shù)據(jù),如果拿不到數(shù)據(jù),會(huì)阻塞住 while (task != null || (task = getTask()) != null) { //鎖住 worker w.lock(); // 線程池 stop 中,但是線程沒(méi)有到達(dá)中斷狀態(tài),幫助線程中斷 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //執(zhí)行 before 鉤子函數(shù) beforeExecute(wt, task); Throwable thrown = null; try { //同步執(zhí)行任務(wù) task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //執(zhí)行 after 鉤子函數(shù),如果這里拋出異常,會(huì)覆蓋 catch 的異常 //所以這里異常最好不要拋出來(lái) afterExecute(task, thrown); } } finally { //任務(wù)執(zhí)行完成,計(jì)算解鎖 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //做一些拋出異常的善后工作 processWorkerExit(w, completedAbruptly); } }
這個(gè)方法執(zhí)行的邏輯是架構(gòu)圖中的標(biāo)紅部分:
我們聚焦一下這行代碼:task.run () 此時(shí)的 task 是什么呢?此時(shí)的 task 是 FutureTask 類,所以我們繼續(xù)追索到 FutureTask 類的 run 方法的源碼,如下:
/** * run 方法可以直接被調(diào)用 * 也可以由線程池進(jìn)行調(diào)用 */ public void run() { // 狀態(tài)不是任務(wù)創(chuàng)建,或者當(dāng)前任務(wù)已經(jīng)有線程在執(zhí)行了 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // Callable 不為空,并且已經(jīng)初始化完成 if (c != null && state == NEW) { V result; boolean ran; try { // 調(diào)用執(zhí)行 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } // 給 outcome 賦值 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
run 方法中有兩行關(guān)鍵代碼:
- result = c.call () 這行代碼是真正執(zhí)行業(yè)務(wù)代碼的地方;
- set (result) 這里是給 outCome 賦值,這樣 Future.get 方法執(zhí)行時(shí),就可以從 outCome 中拿值,這個(gè)我們?cè)凇禙uture、ExecutorService 源碼解析》章節(jié)中都有說(shuō)到。
至此,submit 方法就執(zhí)行完成了,整體流程比較復(fù)雜,我們畫一個(gè)圖釋義一下任務(wù)提交執(zhí)行的主流程:
3、線程執(zhí)行完任務(wù)之后都在干啥
線程執(zhí)行完任務(wù)之后,是消亡還是干什么呢?這是一個(gè)值得思考的問(wèn)題,我們可以從源碼中找到答案,從 ThreadPoolExecutor 的 runWorker 方法中,不知道有沒(méi)有同學(xué)注意到一個(gè) while 循環(huán),我們截圖釋義一下:
這個(gè) while 循環(huán)有個(gè) getTask 方法,getTask 的主要作用是阻塞從隊(duì)列中拿任務(wù)出來(lái),如果隊(duì)列中有任務(wù),那么就可以拿出來(lái)執(zhí)行,如果隊(duì)列中沒(méi)有任務(wù),這個(gè)線程會(huì)一直阻塞到有任務(wù)為止(或者超時(shí)阻塞),下面我們一起來(lái)看下 getTask 方法,源碼如下:
// 從阻塞隊(duì)列中拿任務(wù) private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); //線程池關(guān)閉 && 隊(duì)列為空,不需要在運(yùn)行了,直接放回 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // true 運(yùn)行的線程數(shù)大于 coreSize || 核心線程也可以被滅亡 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 隊(duì)列以 LinkedBlockingQueue 為例,timedOut 為 true 的話說(shuō)明下面 poll 方法執(zhí)行返回的是 null // 說(shuō)明在等待 keepAliveTime 時(shí)間后,隊(duì)列中仍然沒(méi)有數(shù)據(jù) // 說(shuō)明此線程已經(jīng)空閑了 keepAliveTime 了 // 再加上 wc > 1 || workQueue.isEmpty() 的判斷 // 所以使用 compareAndDecrementWorkerCount 方法使線程池?cái)?shù)量減少 1 // 并且直接 return,return 之后,此空閑的線程會(huì)自動(dòng)被回收 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 從隊(duì)列中阻塞拿 worker Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 設(shè)置已超時(shí),說(shuō)明此時(shí)隊(duì)列沒(méi)有數(shù)據(jù) timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
代碼有兩處關(guān)鍵:
使用隊(duì)列的 poll 或 take 方法從隊(duì)列中拿數(shù)據(jù),根據(jù)隊(duì)列的特性,隊(duì)列中有任務(wù)可以返回,隊(duì)列中無(wú)任務(wù)會(huì)阻塞;
方法中的第二個(gè) if 判斷,說(shuō)的是在滿足一定條件下(條件看注釋),會(huì)減少空閑的線程,減少的手段是使可用線程數(shù)減一,并且直接 return,直接 return 后,該線程就執(zhí)行結(jié)束了,JVM 會(huì)自動(dòng)回收該線程。
4、總結(jié)
本章節(jié)主要以 submit 方法為主線闡述了 ThreadPoolExecutor 的整體架構(gòu)和底層源碼,只要有隊(duì)列和線程的基礎(chǔ)知識(shí)的話,理解 ThreadPoolExecutor 并不復(fù)雜。ThreadPoolExecutor 還有一些其他的源碼,比如說(shuō)拒絕請(qǐng)求的策略、得到各種屬性、設(shè)置各種屬性等等方法,這些方法都比較簡(jiǎn)單,感興趣的同學(xué)可以自己去看一看。
以上就是Java 線程池ThreadPoolExecutor源碼解析的詳細(xì)內(nèi)容,更多關(guān)于Java 線程池ThreadPoolExecutor的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java使用ScriptEngine動(dòng)態(tài)執(zhí)行代碼(附Java幾種動(dòng)態(tài)執(zhí)行代碼比較)
這篇文章主要介紹了Java使用ScriptEngine動(dòng)態(tài)執(zhí)行代碼,并且分享Java幾種動(dòng)態(tài)執(zhí)行代碼比較,需要的朋友可以參考下2021-04-04java實(shí)現(xiàn)獲取網(wǎng)站的keywords,description
這篇文章主要介紹了java實(shí)現(xiàn)獲取網(wǎng)站的keywords,description的相關(guān)資料,需要的朋友可以參考下2015-03-03Java中LinkedList數(shù)據(jù)結(jié)構(gòu)的詳細(xì)介紹
這篇文章主要介紹了Java中LinkedList,Linked List 是 java.util 包中 Collection 框架的一部分,文中提供了詳細(xì)的代碼說(shuō)明,需要的朋友可以參考下2023-05-05Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(37)
下面小編就為大家?guī)?lái)一篇Java基礎(chǔ)的幾道練習(xí)題(分享)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧,希望可以幫到你2021-07-07Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(6)
下面小編就為大家?guī)?lái)一篇Java基礎(chǔ)的幾道練習(xí)題(分享)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧,希望可以幫到你2021-07-07springboot+vue實(shí)現(xiàn)Minio文件存儲(chǔ)的示例代碼
本文主要介紹了springboot+vue實(shí)現(xiàn)Minio文件存儲(chǔ)的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-02-02spring Boot與Mybatis整合優(yōu)化詳解
關(guān)于spring-boot與mybatis整合優(yōu)化方面的介紹,就是Mybatis-Spring-boot-starter的介紹,具體內(nèi)容詳情大家參考下本文2017-07-07