詳解Java并發(fā)包中線程池ThreadPoolExecutor
一、線程池簡介
線程池的使用主要是解決兩個問題:①當(dāng)執(zhí)行大量異步任務(wù)的時候線程池能夠提供更好的性能,在不使用線程池時候,每當(dāng)需要執(zhí)行異步任務(wù)的時候直接new一個線程來運行的話,線程的創(chuàng)建和銷毀都是需要開銷的。而線程池中的線程是可復(fù)用的,不需要每次執(zhí)行異步任務(wù)的時候重新創(chuàng)建和銷毀線程;②線程池提供一種資源限制和管理的手段,比如可以限制線程的個數(shù),動態(tài)的新增線程等等。
在下面的分析中,我們可以看到,線程池使用一個Integer的原子類型變量來記錄線程池狀態(tài)和線程池中的線程數(shù)量,通過線程池狀態(tài)來控制任務(wù)的執(zhí)行,每個工作線程Worker線程可以處理多個任務(wù)。
二、ThreadPoolExecutor類
2.1、ThreadPoolExecutor成員變量以含義
ThreadPoolExecutor繼承了AbstractExecutorService,其中的成員變量ctl是一個Integer類型的原子變量,用來記錄線程池的狀態(tài)和線程池中的線程的個數(shù)。這里(Integer看做32位)ctl高三位表示線程池的狀態(tài),后面的29位表示線程池中的線程個數(shù)。如下所示是ThreadPoolExecutor源碼中的成員變量
//(高3位)表示線程池狀態(tài),(低29位)表示線程池中線程的個數(shù); // 默認(rèn)狀態(tài)是RUNNING,線程池中線程個數(shù)為0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //表示具體平臺下Integer的二進(jìn)制位數(shù)-3后的剩余位數(shù)表示的數(shù)才是線程的個數(shù); //其中Integer.SIZE=32,-3之后的低29位表示的就是線程的個數(shù)了 private static final int COUNT_BITS = Integer.SIZE - 3; //線程最大個數(shù)(低29位)00011111111111111111111111111111(1<<29-1) private static final int CAPACITY = (1 << COUNT_BITS) - 1; //線程池狀態(tài)(高3位表示線程池狀態(tài)) //111 00000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; //000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; //001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; //010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; //011 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; //獲取高3位(運行狀態(tài))==> c & 11100000000000000000000000000000 private static int runStateOf(int c) { return c & ~CAPACITY; } //獲取低29位(線程個數(shù))==> c & 00011111111111111111111111111111 private static int workerCountOf(int c) { return c & CAPACITY; } //計算原子變量ctl新值(運行狀態(tài)和線程個數(shù)) private static int ctlOf(int rs, int wc) { return rs | wc; }
下面我們簡單解釋一下上面的線程狀態(tài)的含義:
①RUNNING:接受新任務(wù)并處理阻塞隊列中的任務(wù)
②SHUTDOWN:拒絕新任務(wù)但是處理阻塞隊列中的任務(wù)
③STOP:拒絕新任務(wù)并拋棄阻塞隊列中的任務(wù),同時會中斷當(dāng)前正在執(zhí)行的任務(wù)
④TIDYING:所有任務(wù)執(zhí)行完之后(包含阻塞隊列中的任務(wù))當(dāng)前線程池中活躍的線程數(shù)量為0,將要調(diào)用terminated方法
⑥TERMINATED:終止?fàn)顟B(tài)。terminated方法調(diào)用之后的狀態(tài)
2.2、ThreadPoolExecutor的參數(shù)以及實現(xiàn)原理
①corePoolSize:線程池核心現(xiàn)車個數(shù)
②workQueue:用于保存等待任務(wù)執(zhí)行的任務(wù)的阻塞隊列(比如基于數(shù)組的有界阻塞隊列ArrayBlockingQueue、基于鏈表的無界阻塞隊列LinkedBlockingQueue等等)
③maximumPoolSize:線程池最大線程數(shù)量
④ThreadFactory:創(chuàng)建線程的工廠
⑤RejectedExecutionHandler:拒絕策略,表示當(dāng)隊列已滿并且線程數(shù)量達(dá)到線程池最大線程數(shù)量的時候?qū)π绿峤坏娜蝿?wù)所采取的策略,主要有四種策略:AbortPolicy(拋出異常)、CallerRunsPolicy(只用調(diào)用者所在線程來運行該任務(wù))、DiscardOldestPolicy(丟掉阻塞隊列中最近的一個任務(wù)來處理當(dāng)前提交的任務(wù))、DiscardPolicy(不做處理,直接丟棄掉)
⑥keepAliveTime:存活時間,如果當(dāng)前線程池中的數(shù)量比核心線程數(shù)量多,并且當(dāng)前線程是閑置狀態(tài),該變量就是這些線程的最大生存時間
⑦TimeUnit:存活時間的時間單位。
根據(jù)上面的參數(shù)介紹,簡單了解一下線程池的實現(xiàn)原理,以提交一個新任務(wù)為開始點,分析線程池的主要處理流程
2.3、關(guān)于一些線程池的使用類型
①newFixedThreadPool:創(chuàng)建一個核心線程個數(shù)和最大線程個數(shù)均為nThreads的線程池,并且阻塞隊列長度為Integer.MAX_VALUE,keepAliveTime=0說明只要線程個數(shù)比核心線程個數(shù)多并且當(dāng)前空閑即回收。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
②newSingleThreadExecutor:創(chuàng)建一個核心線程個數(shù)和最大線程個數(shù)都為1 的線程池,并且阻塞隊列長度為Integer.MAX_VALUE,keepAliveTime=0說明只要線程個數(shù)比核心線程個數(shù)多并且當(dāng)前線程空閑即回收該線程。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
③newCachedThreadPoolExecutor:創(chuàng)建一個按需創(chuàng)建線程的線程池,初始線程個數(shù)為0,最多線程個數(shù)為Integer.MAX_VALUE,并且阻塞隊列為同步隊列(最多只有一個元素),keepAliveTime=60說明只要當(dāng)前線程在60s內(nèi)空閑則回收。這個類型的線程池的特點就是:加入同步隊列的任務(wù)會被馬上執(zhí)行,同步隊列中最多只有一個任務(wù)
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
2.4、ThreadPoolExecutor中的其他成員
//獨占鎖,用來控制新增工作線程Worker操作的原子性 private final ReentrantLock mainLock = new ReentrantLock(); //工作線程集合,Worker繼承了AQS接口和Runnable接口,是具體處理任務(wù)的線程對象 //Worker實現(xiàn)AQS,并自己實現(xiàn)了簡單不可重入獨占鎖,其中state=0表示當(dāng)前鎖未被獲取狀態(tài),state=1表示鎖被獲取, //state=-1表示W(wǎng)ork創(chuàng)建時候的默認(rèn)狀態(tài),創(chuàng)建時候設(shè)置state=-1是為了防止runWorker方法運行前被中斷 private final HashSet<Worker> workers = new HashSet<Worker>(); //termination是該鎖對應(yīng)的條件隊列,在線程調(diào)用awaitTermination時候用來存放阻塞的線程 private final Condition termination = mainLock.newCondition();
三、execute(Runnable command)方法實現(xiàn)
executor方法的作用是提交任務(wù)command到線程池執(zhí)行,可以簡單的按照下面的圖進(jìn)行理解,ThreadPoolExecutor的實現(xiàn)類似于一個生產(chǎn)者消費者模型,當(dāng)用戶添加任務(wù)到線程池中相當(dāng)于生產(chǎn)者生產(chǎn)元素,workers工作線程則直接執(zhí)行任務(wù)或者從任務(wù)隊列中獲取任務(wù),相當(dāng)于消費之消費元素。
public void execute(Runnable command) { //(1)首先檢查任務(wù)是否為null,為null拋出異常,否則進(jìn)行下面的步驟 if (command == null) throw new NullPointerException(); //(2)ctl值中包含了當(dāng)前線程池的狀態(tài)和線程池中的線程數(shù)量 int c = ctl.get(); //(3)workerCountOf方法是獲取低29位,即獲取當(dāng)前線程池中的線程個數(shù),如果小于corePoolSize,就開啟新的線程運行 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //(4)如果線程池處理RUNNING狀態(tài),就添加任務(wù)到阻塞隊列中 if (isRunning(c) && workQueue.offer(command)) { //(4-1)二次檢查,獲取ctl值 int recheck = ctl.get(); //(4-2)如果當(dāng)前線程池不是出于RUNNING狀態(tài),就從隊列中刪除任務(wù),并執(zhí)行拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); //(4-3)否則,如果線程池為空,就添加一個線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //(5)如果隊列滿,則新增線程,如果新增線程失敗,就執(zhí)行拒絕策略 else if (!addWorker(command, false)) reject(command); }
我們在看一下上面代碼的執(zhí)行流程,按照標(biāo)記的數(shù)字進(jìn)行分析:
①步驟(3)判斷當(dāng)前線程池中的線程個數(shù)是否小于corePoolSize,如果小于核心線程數(shù),會向workers里面新增一個核心線程執(zhí)行任務(wù)。
②如果當(dāng)前線程池中的線程數(shù)量大于核心線程數(shù),就執(zhí)行(4)。(4)首先判斷當(dāng)前線程池是否處于RUNNING狀態(tài),如果處于該狀態(tài),就添加任務(wù)到任務(wù)隊列中,這里需要判斷線程池的狀態(tài)是因為線程池可能已經(jīng)處于非RUNNING狀態(tài),而在非RUNNING狀態(tài)下是需要拋棄新任務(wù)的。
③如果想任務(wù)隊列中添加任務(wù)成功,需要進(jìn)行二次校驗,因為在添加任務(wù)到任務(wù)隊列后,可能線程池的狀態(tài)發(fā)生了變化,所以這里需要進(jìn)行二次校驗,如果當(dāng)前線程池已經(jīng)不是RUNNING狀態(tài)了,需要將任務(wù)從任務(wù)隊列中移除,然后執(zhí)行拒絕策略;如果二次校驗通過,則執(zhí)行4-3代碼重新判斷當(dāng)前線程池是否為空,如果線程池為空沒有線程,那么就需要新創(chuàng)建一個線程。
④如果上面的步驟(4)創(chuàng)建添加任務(wù)失敗,說明隊列已滿,那么(5)會嘗試再開啟新的線程執(zhí)行任務(wù)(類比上圖中的thread3和thread4,即不是核心線程的那些線程),如果當(dāng)前線程池中的線程個數(shù)已經(jīng)大于最大線程數(shù)maximumPoolSize,表示不能開啟新的線程。這就屬于線程池滿并且任務(wù)隊列滿,就需要執(zhí)行拒絕策略了。
下面我們在看看addWorker方法的實現(xiàn)
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); //(6)檢查隊列是否只在必要時候為空 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //(7)使用CAS增加線程個數(shù) for (;;) { //根據(jù)ctl值獲得當(dāng)前線程池中的線程數(shù)量 int wc = workerCountOf(c); //(7-1)如果線程數(shù)量超出限制,返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //(7-2)CAS增加線程數(shù)量,同時只有一個線程可以成功 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // 重新讀取ctl值 //(7-3)CAS失敗了,需要查看當(dāng)前線程池狀態(tài)是否發(fā)生變化,如果發(fā)生變化需要跳轉(zhuǎn)到外層循環(huán)嘗試重新獲取線程池狀態(tài),否則內(nèi)層循環(huán)重新進(jìn)行CAS增加線程數(shù)量 if (runStateOf(c) != rs) continue retry; } } //(8)執(zhí)行到這里說明CAS增加新線程個數(shù)成功了,我們需要開始創(chuàng)建新的工作線程Worker boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //(8-1)創(chuàng)建新的worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; //(8-2)加獨占鎖,保證workers的同步,可能線程池中的多個線程調(diào)用了線程池的execute方法 mainLock.lock(); try { // (8-3)重新檢查線程池狀態(tài),以免在獲取鎖之前調(diào)用shutdown方法改變線程池狀態(tài) int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //(8-4)添加新任務(wù) workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //(8-6)添加新任務(wù)成功之后,啟動任務(wù) if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
簡單再分析說明一下上面的代碼,addWorker方法主要分為兩部分,第一部分是使用CAS線程安全的添加線程數(shù)量,第二部分則是創(chuàng)建新的線程并且將任務(wù)并發(fā)安全的添加到新的workers之中,然后啟動線程執(zhí)行。
①代碼(6)中檢查隊列是否只在必要時候為空,只有線程池狀態(tài)符合條件才能夠進(jìn)行下面的步驟,從(6)中的判斷條件來看,下面的集中情況addWorker會直接返回false
( I )當(dāng)前線程池狀態(tài)為STOP,TIDYING或者TERMINATED ; (I I)當(dāng)前線程池狀態(tài)為SHUTDOWN并且已經(jīng)有了第一個任務(wù); (I I I)當(dāng)前線程池狀態(tài)為SHUTDOWN并且任務(wù)隊列為空
②外層循環(huán)中判斷條件通過之后,在內(nèi)層循環(huán)中使用CAS增加線程數(shù),當(dāng)CAS成功就退出雙重循環(huán)進(jìn)行(8)步驟代碼的執(zhí)行,如果失敗需要查看當(dāng)前線程池的狀態(tài)是否發(fā)生變化,如果發(fā)生變化需要進(jìn)行外層循環(huán)重新判斷線程池狀態(tài)然后在進(jìn)入內(nèi)層循環(huán)重新進(jìn)行CAS增加線程數(shù),如果線程池狀態(tài)沒有發(fā)生變化但是上一次CAS失敗就繼續(xù)進(jìn)行CAS嘗試。
③執(zhí)行到(8)代碼處,表明當(dāng)前已經(jīng)成功增加 了線程數(shù),但是還沒有線程執(zhí)行任務(wù)。ThreadPoolExecutor中使用全局獨占鎖mainLock來控制將新增的工作線程Worker線程安全的添加到工作者線程集合workers中。
④(8-2)獲取了獨占鎖,但是在獲取到鎖之后,還需要進(jìn)行重新檢查線程池的狀態(tài),這是為了避免在獲取全局獨占鎖之前其他線程調(diào)用了shutDown方法關(guān)閉了線程池。如果線程池已經(jīng)關(guān)閉需要釋放鎖。否則將新增的線程添加到工作集合中,釋放鎖啟動線程執(zhí)行任務(wù)。
上面的addWorker方法最后幾行中,會判斷添加工作線程是否成功,如果失敗,會執(zhí)行addWorkerFailed方法,將任務(wù)從workers中移除,并且workerCount做-1操作。
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; //獲取鎖 mainLock.lock(); try { //如果worker不為null if (w != null) //workers移除worker workers.remove(w); //通過CAS操作,workerCount-1 decrementWorkerCount(); tryTerminate(); } finally { //釋放鎖 mainLock.unlock(); } }
四、工作線程Worker的執(zhí)行
4.1、工作線程Worker類源碼分析
上面查看addWorker方法在CAS更新線程數(shù)成功之后,下面就是創(chuàng)建新的Worker線程執(zhí)行任務(wù),所以我們這里先查看Worker類,下面是Worker類的源碼,我們可以看出,Worker類繼承了AQS并實現(xiàn)了Runnable接口,所以他既是一個自定義的同步組件,也是一個執(zhí)行任務(wù)的線程類。下面我們分析Worker類的執(zhí)行
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** 使用線程工廠創(chuàng)建的線程,執(zhí)行任務(wù) */ final Thread thread; /** 初始化執(zhí)行任務(wù) */ Runnable firstTask; /** 計數(shù) */ volatile long completedTasks; /** * 給出初始firstTask,線程創(chuàng)建工廠創(chuàng)建新的線程 */ Worker(Runnable firstTask) { setState(-1); // 防止在調(diào)用runWorker之前被中斷 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); //使用threadFactory創(chuàng)建線程 } /** run方法實際上執(zhí)行的是runWorker方法 */ public void run() { runWorker(this); } // 關(guān)于同步狀態(tài)(鎖) // // 同步狀態(tài)state=0表示鎖未被獲取 // 同步狀態(tài)state=1表示鎖被獲取 protected boolean isHeldExclusively() { return getState() != 0; } //下面都是重寫AQS的方法,Worker為自定義的同步組件 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
在構(gòu)造函數(shù)中我們可以看出,首先將同步狀態(tài)state置為-1,而Worker這個同步組件的state有三個值,其中state=-1表示W(wǎng)ork創(chuàng)建時候的默認(rèn)狀態(tài),創(chuàng)建時候設(shè)置state=-1是為了防止runWorker方法運行前被中斷前面說到過這個結(jié)論,這里置為-1是為了避免當(dāng)前Worker在調(diào)用runWorker方法之前被中斷(當(dāng)其他線程調(diào)用線程池的shutDownNow時候,如果Worker的state>=0則會中斷線程),設(shè)置為-1就不會被中斷了。而Worker實現(xiàn)Runnable接口,那么需要重寫run方法,在run方法中,我們可以看到,實際上執(zhí)行的是runWorker方法,在runWorker方法中,會首先調(diào)用unlock方法,該方法會將state置為0,所以這個時候調(diào)用shutDownNow方法就會中斷當(dāng)前線程,而這個時候已經(jīng)進(jìn)入了runWork方法了,就不會在還沒有執(zhí)行runWorker方法的時候就中斷線程。
4.2、runWorker方法的源碼分析
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 這個時候調(diào)用unlock方法,將state置為0,就可以被中斷了 boolean completedAbruptly = true; try { //(10)如果當(dāng)前任務(wù)為null,或者從任務(wù)隊列中獲取到的任務(wù)為null,就跳轉(zhuǎn)到(11)處執(zhí)行清理工作 while (task != null || (task = getTask()) != null) { //task不為null,就需要線程執(zhí)行任務(wù),這個時候,需要獲取工作線程內(nèi)部持有的獨占鎖 w.lock(); /**如果線程池已被停止(STOP)(至少大于STOP狀態(tài)),要確保線程都被中斷 * 如果狀態(tài)不對,檢查當(dāng)前線程是否中斷并清除中斷狀態(tài),并且再次檢查線程池狀態(tài)是否大于STOP * 如果上述滿足,檢查該對象是否處于中斷狀態(tài),不清除中斷標(biāo)記 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) //中斷該對象 wt.interrupt(); try { //執(zhí)行任務(wù)之前要做的事情 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); //執(zhí)行任務(wù) } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //執(zhí)行任務(wù)之后的方法 afterExecute(task, thrown); } } finally { task = null; //更新當(dāng)前已完成任務(wù)數(shù)量 w.completedTasks++; //釋放鎖 w.unlock(); } } completedAbruptly = false; } finally { //執(zhí)行清理工作:處理并退出當(dāng)前worker processWorkerExit(w, completedAbruptly); } }
我們梳理一下runWorker方法的執(zhí)行流程
①首先先執(zhí)行unlock方法,將Worker的state置為0,這樣工作線程就可以被中斷了(后續(xù)的操作如果線程池關(guān)閉就需要線程被中斷)
②首先判斷判斷當(dāng)前的任務(wù)(當(dāng)前工作線程中的task,或者從任務(wù)隊列中取出的task)是否為null,如果不為null就往下執(zhí)行,為null就執(zhí)行processWorkerExit方法。
③獲取工作線程內(nèi)部持有的獨占鎖(避免在執(zhí)行任務(wù)期間,其他線程調(diào)用shutdown后正在執(zhí)行的任務(wù)被中斷,shutdown只會中斷當(dāng)前被阻塞掛起的沒有執(zhí)行任務(wù)的線程)
④然后執(zhí)行beforeExecute()方法,該方法為擴(kuò)展接口代碼,表示在具體執(zhí)行任務(wù)之前所做的一些事情,然后執(zhí)行task.run()方法執(zhí)行具體任務(wù),執(zhí)行完之后會調(diào)用afterExecute()方法,用以處理任務(wù)執(zhí)行完畢之后的工作,也是一個擴(kuò)展接口代碼。
⑤更新當(dāng)前線程池完成的任務(wù)數(shù),并釋放鎖
4.3、執(zhí)行清理工作的方法processWorkerExit
下面是方法processWorkerExit的源碼,在下面的代碼中
①首先(1-1)處統(tǒng)計線程池完成的任務(wù)個數(shù),并且在此之前獲取全局鎖,然后更新當(dāng)前的全局計數(shù)器,然后從工作線程集合中移除當(dāng)前工作線程,完成清理工作。
②代碼(1-2)調(diào)用了tryTerminate方法,在該方法中,判斷了當(dāng)前線程池狀態(tài)是SHUTDOWN并且隊列不為空或者當(dāng)前線程池狀態(tài)為STOP并且當(dāng)前線程池中沒有活動線程,則置線程池狀態(tài)為TERMINATED。如果設(shè)置稱為了TERMINATED狀態(tài),還需要調(diào)用全局條件變量termination的signalAll方法喚醒所有因為調(diào)用線程池的awaitTermination方法而被阻塞住的線程,使得線程池中的所有線程都停止,從而使得線程池為TERMINATED狀態(tài)。
③代碼(1-3)處判斷當(dāng)前線程池中的線程個數(shù)是否小于核心線程數(shù),如果是,需要新增一個線程保證有足夠的線程可以執(zhí)行任務(wù)隊列中的任務(wù)或者提交的任務(wù)。
private void processWorkerExit(Worker w, boolean completedAbruptly) { /* *completedAbruptly:是由runWorker傳過來的參數(shù),表示是否突然完成的意思 *當(dāng)在就是在執(zhí)行任務(wù)過程當(dāng)中出現(xiàn)異常,就會突然完成,傳true * *如果是突然完成,需要通過CAS操作,更新workerCount(-1操作) *不是突然完成,則不需要-1,因為getTask方法當(dāng)中已經(jīng)-1(getTask方法中執(zhí)行了decrementWorkerCount()方法) */ if (completedAbruptly) decrementWorkerCount(); //(1-1)在統(tǒng)計完成任務(wù)個數(shù)之前加上全局鎖,然后統(tǒng)計線程池中完成的任務(wù)個數(shù)并更新全局計數(shù)器,并從工作集中刪除當(dāng)前worker final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //獲得全局鎖 try { completedTaskCount += w.completedTasks; //更新已完成的任務(wù)數(shù)量 workers.remove(w); //將完成該任務(wù)的線程worker從工作線程集合中移除 } finally { mainLock.unlock(); //釋放鎖 } /**(1-2) * 這一個方法調(diào)用完成了下面的事情: * 判斷如果當(dāng)前線程池狀態(tài)是SHUTDOWN并且工作隊列為空, * 或者當(dāng)前線程池狀態(tài)是STOP并且當(dāng)前線程池里面沒有活動線程, * 則設(shè)置當(dāng)前線程池狀態(tài)為TERMINATED,如果設(shè)置成了TERMINATED狀態(tài), * 還需要調(diào)用條件變量termination的signAll方法激活所有因為調(diào)用線程池的awaitTermination方法而被阻塞的線程 */ tryTerminate(); //(1-3)如果當(dāng)前線程池中線程數(shù)小于核心線程,則增加核心線程數(shù) int c = ctl.get(); //判斷當(dāng)前線程池的狀態(tài)是否小于STOP(RUNNING或者SHUTDOWN) if (runStateLessThan(c, STOP)) { //如果任務(wù)忽然完成,執(zhí)行后續(xù)的代碼 if (!completedAbruptly) { //allowCoreThreadTimeOut表示是否允許核心線程超時,默認(rèn)為false //min這里當(dāng)默認(rèn)為allowCoreThreadTimeOut默認(rèn)為false的時候,min置為coorPoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //這里說明:如果允許核心線程超時,那么allowCoreThreadTimeOut可為true,那么min值為0,不需要維護(hù)核心線程了 //如果min為0并且任務(wù)隊列不為空 if (min == 0 && ! workQueue.isEmpty()) min = 1; //這里表示如果min為0,且隊列不為空,那么至少需要一個核心線程存活來保證任務(wù)的執(zhí)行 //如果工作線程數(shù)大于min,表示當(dāng)前線程數(shù)滿足,直接返回 if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
在tryTerminate方法中,我們簡單說明了該方法的作用,下面是該方法的源碼,可以看出源碼實現(xiàn)上和上面所總結(jié)的功能是差不多的
final void tryTerminate() { for (;;) { //獲取線程池狀態(tài) int c = ctl.get(); //如果線程池狀態(tài)為RUNNING //或者狀態(tài)大于TIDYING //或者狀態(tài)==SHUTDOWN并未任務(wù)隊列不為空 //直接返回,不能調(diào)用terminated方法 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //如果線程池中工作線程數(shù)不為0,需要中斷線程 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } //獲得線程池的全局鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //通過CAS操作,將線程池狀態(tài)設(shè)置為TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //private static int ctlOf(int rs, int wc) { return rs | wc; } try { //調(diào)用terminated方法 terminated(); } finally { //最終將線程狀態(tài)設(shè)置為TERMINATED ctl.set(ctlOf(TERMINATED, 0)); //調(diào)用條件變量termination的signaAll方法喚醒所有因為 //調(diào)用線程池的awaitTermination方法而被阻塞的線程 //private final Condition termination = mainLock.newCondition(); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
五、補充(shutdown、shutdownNow、awaitTermination方法)
5.1、shutdown操作
我們在使用線程池的時候知道,調(diào)用shutdown方法之后線程池就不會再接受新的任務(wù)了,但是任務(wù)隊列中的任務(wù)還是需要執(zhí)行完的。調(diào)用該方法會立刻返回,并不是等到線程池的任務(wù)隊列中的所有任務(wù)執(zhí)行完畢在返回的。
public void shutdown() { //獲得線程池的全局鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //進(jìn)行權(quán)限檢查 checkShutdownAccess(); //設(shè)置當(dāng)前線程池的狀態(tài)的SHUTDOWN,如果線程池狀態(tài)已經(jīng)是該狀態(tài)就會直接返回,下面我們會分析這個方法的源碼 advanceRunState(SHUTDOWN); //設(shè)置中斷 標(biāo)志 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } //嘗試將狀態(tài)變?yōu)門ERMINATED,上面已經(jīng)分析過該方法的源碼 tryTerminate(); }
該方法的源碼比較簡短,首先檢查了安全管理器,是查看當(dāng)前調(diào)用shutdown命令的線程是否有關(guān)閉線程的權(quán)限,如果有權(quán)限還需要看調(diào)用線程是否有中斷工作線程的權(quán)限,如果沒有權(quán)限將會拋出SecurityException異常或者空指針異常。下面我們查看一下advanceRunState 方法的源碼。
private void advanceRunState(int targetState) { for (;;) { //下面的方法執(zhí)行的就是: //首先獲取線程的ctl值,然后判斷當(dāng)前線程池的狀態(tài)如果已經(jīng)是SHUTDOWN,那么if條件第一個為真就直接返回 //如果不是SHUTDOWN狀態(tài),就需要CAS的設(shè)置當(dāng)前狀態(tài)為SHUTDOWN int c = ctl.get(); if (runStateAtLeast(c, targetState) || //private static int ctlOf(int rs, int wc) { return rs | wc; } ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
我們可以看出advanceRunState 方法實際上就是判斷當(dāng)前線程池的狀態(tài)是否為SHUTDWON,如果是那么就返回,否則就需要設(shè)置當(dāng)前狀態(tài)為SHUTDOWN。
我們再來看看shutdown方法中調(diào)用線程中斷的方法interruptIdleWorkers源碼
private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; //如果工作線程沒有被中斷,并且沒有正在運行設(shè)置中斷標(biāo)志 if (!t.isInterrupted() && w.tryLock()) { try { //需要中斷當(dāng)前線程 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
上面的代碼中,需要設(shè)置所有空閑線程的中斷標(biāo)志。首先獲取線程池的全局鎖,同時只有一個線程可以調(diào)用shutdown方法設(shè)置中斷標(biāo)志。然后嘗試獲取工作線程Worker自己的鎖,獲取成功則可以設(shè)置中斷標(biāo)志(這是由于正在執(zhí)行任務(wù)的線程需要獲取自己的鎖,并且不可重入,所以正在執(zhí)行的任務(wù)沒有被中斷),這里要中斷的那些線程是阻塞到getTask()方法并嘗試從任務(wù)隊列中獲取任務(wù)的線程即空閑線程。
5.2、shutdownNow操作
在使用線程池的時候,如果我們調(diào)用了shutdownNow方法,線程池不僅不會再接受新的任務(wù),還會將任務(wù)隊列中的任務(wù)丟棄,正在執(zhí)行的任務(wù)也會被中斷,然后立刻返回該方法,不會等待激活的任務(wù)完成,返回值為當(dāng)前任務(wù)隊列中被丟棄的任務(wù)列表
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //還是進(jìn)行權(quán)限檢查 advanceRunState(STOP); //設(shè)置線程池狀態(tài)臺STOP interruptWorkers(); //中斷所有線程 tasks = drainQueue(); //將任務(wù)隊列中的任務(wù)移動到task中 } finally { mainLock.unlock(); } tryTerminate(); return tasks; //返回tasks }
從上面的代碼中,我們可以可以發(fā)現(xiàn),shutdownNow方法也是首先需要檢查調(diào)用該方法的線程的權(quán)限,之后不同于shutdown方法之處在于需要即刻設(shè)置當(dāng)前線程池狀態(tài)為STOP,然后中斷所有線程(空閑線程+正在執(zhí)行任務(wù)的線程),移除任務(wù)隊列中的任務(wù)
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) //不需要判斷當(dāng)前線程是否在執(zhí)行任務(wù)(即不需要調(diào)用w.tryLock方法),中斷所有線程 w.interruptIfStarted(); } finally { mainLock.unlock(); } }
5.3、awaitTermination操作
當(dāng)線程調(diào)用該方法之后,會阻塞調(diào)用者線程,直到線程池狀態(tài)為TERMINATED狀態(tài)才會返回,或者等到超時時間到之后會返回,下面是該方法的源碼。
//調(diào)用該方法之后,會阻塞調(diào)用者線程,直到線程池狀態(tài)為TERMINATED狀態(tài)才會返回,或者等到超時時間到之后會返回 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //阻塞當(dāng)前線程,(獲取了Worker自己的鎖),那么當(dāng)前線程就不會再執(zhí)行任務(wù)(因為獲取不到鎖) for (;;) { //當(dāng)前線程池狀態(tài)為TERMINATED狀態(tài),會返回true if (runStateAtLeast(ctl.get(), TERMINATED)) return true; //超時時間到返回false if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
在上面的代碼中,調(diào)用者線程需要首先獲取線程Worker自己的獨占鎖,然后在循環(huán)判斷當(dāng)前線程池是否已經(jīng)是TERMINATED狀態(tài),如果是則直接返回,否則說明當(dāng)前線程池中還有線程正在執(zhí)行任務(wù),這時候需要查看當(dāng)前設(shè)置的超時時間是否小于0,小于0說明不需要等待就直接返回,如果大于0就需要調(diào)用條件變量termination的awaitNanos方法等待設(shè)置的時間,并在這段時間之內(nèi)等待線程池的狀態(tài)變?yōu)門ERMINATED。
我們在前面說到清理線程池的方法processWorkerExit的時候,需要調(diào)用tryTerminated方法,在該方法中會查看當(dāng)前線程池狀態(tài)是否為TERMINATED,如果是該狀態(tài)也會調(diào)用termination.signalAll()方法喚醒所有線程池中因調(diào)用awaitTermination而被阻塞住的線程。
如果是設(shè)置了超時時間,那么termination的awaitNanos方法也會返回,這時候需要重新檢查線程池狀態(tài)是否為TERMINATED,如果是則返回,不是就繼續(xù)阻塞自己。
以上就是Java并發(fā)包中線程池ThreadPoolExecutor原理探究的詳細(xì)內(nèi)容,更多關(guān)于Java 并發(fā)包 線程池 ThreadPoolExecutor的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
一個JAVA小項目--Web應(yīng)用自動生成Word
前段時間接到一個Web應(yīng)用自動生成Word的需求,現(xiàn)整理了下一些關(guān)鍵步驟拿來分享一下。2014-05-05Java Socket循環(huán)接收數(shù)據(jù)readLine()阻塞的解決方案
這篇文章主要介紹了Java Socket循環(huán)接收數(shù)據(jù)readLine()阻塞的解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-08-08fastjson全局日期序列化設(shè)置導(dǎo)致JSONField失效問題解決方案
這篇文章主要介紹了fastjson通過代碼指定全局序列化返回時間格式,導(dǎo)致使用JSONField注解標(biāo)注屬性的特殊日期返回格式失效問題的解決方案2023-01-01MyBatis關(guān)閉一級緩存的兩種方式(分注解和xml兩種方式)
這篇文章主要介紹了MyBatis關(guān)閉一級緩存的兩種方式(分注解和xml兩種方式),mybatis默認(rèn)開啟一級緩存,執(zhí)行2次相同sql,但是第一次查詢sql結(jié)果會加工處理這個時候需要關(guān)閉一級緩存,本文給大家詳細(xì)講解需要的朋友可以參考下2022-11-11mybatis-plus常用注解@TableId和@TableField的用法
本文主要介紹了mybatis-plus常用注解@TableId和@TableField的用法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04JAVA JSP頁面技術(shù)之EL表達(dá)式整理歸納總結(jié)
這篇文章主要介紹了java中JSP頁面技術(shù)之EL表達(dá)式概念作用以及語法等的使用,需要的朋友可以參考2017-04-04Spring File Storage文件的對象存儲框架基本使用小結(jié)
在開發(fā)過程當(dāng)中,會使用到存文檔、圖片、視頻、音頻等等,這些都會涉及存儲的問題,文件可以直接存服務(wù)器,但需要考慮帶寬和存儲空間,另外一種方式就是使用云存儲,這篇文章主要介紹了Spring File Storage文件的對象存儲框架基本使用小結(jié),需要的朋友可以參考下2024-08-08