深入了解Java線程池:從設計思想到源碼解讀
為什么需要線程池
我們知道創(chuàng)建線程的常用方式就是 new Thread() ,而每一次 new Thread() 都會重新創(chuàng)建一個線程,而線程的創(chuàng)建和銷毀都需要耗時的,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性。在 jdk1.5 的 JUC 包中有一個 Executors,他能使我們創(chuàng)建的線程得到復用,不會頻繁的創(chuàng)建和銷毀線程。
線程池首先創(chuàng)建一些線程,它們的集合稱為線程池。使用線程池可以很好地提高性能,線程池在系統(tǒng)啟動時即創(chuàng)建大量空閑的線程,程序將一個任務傳給線程池,線程池就會啟動一條線程來執(zhí)行這個任務,執(zhí)行結束以后,該線程并不會死亡,而是再次返回線程池中成為空閑狀態(tài),等待執(zhí)行下一個任務。
先不管它到底是個啥,先看看使用線程池和 new Thread() 的耗時情況:
public class ThreadPoolTest { static CountDownLatch latch = new CountDownLatch(100000); static ExecutorService es = Executors.newFixedThreadPool(4); public static void main(String[] args) throws InterruptedException { long timeStart = System.currentTimeMillis(); for (int i = 0; i < 100000; i++) { newThread(); //executors(); } latch.await(); System.out.println(System.currentTimeMillis() - timeStart); es.shutdown(); } /** * 使用線程池 */ public static void executors() { es.submit(() -> { latch.countDown(); }); } /** * 直接new */ public static void newThread() { new Thread(() -> { latch.countDown(); }).start(); } }
對于 10 萬個線程同時跑,如果使用 new 的方式耗時:
使用線程池耗時:
總得來說,合理的使用線程池可以帶來以下幾個好處:
1.降低資源消耗。通過重復利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀造成的消耗。
2.提高響應速度。當任務到達時,任務可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
3.增加線程的可管理性。線程是稀缺資源,使用線程池可以進行統(tǒng)一分配,調(diào)優(yōu)和監(jiān)控。
線程池設計思路
我們先了解線程池的思路,哪怕你重來沒了解過什么是線程池,所以不會一上來就給你講一堆線程池的參數(shù)。我嘗試多種想法來解釋它的設計思路,但都過于官方,但在查找資料的時候在博客上看到了非常通俗易懂的描述,它是這樣描述的,先假想一個工廠的生產(chǎn)流程:
工廠中有固定的一批工人,稱為正式工人,工廠接收的訂單由這些工人去完成。當訂單增加,正式工人已經(jīng)忙不過來了,工廠會將生產(chǎn)原料暫時堆積在倉庫中,等有空閑的工人時再處理(因為工人空閑了也不會主動處理倉庫中的生產(chǎn)任務,所以需要調(diào)度員實時調(diào)度)。倉庫堆積滿了后,訂單還在增加怎么辦?工廠只能臨時擴招一批工人來應對生產(chǎn)高峰,而這批工人高峰結束后是要清退的,所以稱為臨時工。當時臨時工也以招滿后(受限于工位限制,臨時工數(shù)量有上限),后面的訂單只能忍痛拒絕了。
和線程池的映射如下:
- 工廠——線程池
- 訂單——任務(Runnable)
- 正式工人——核心線程
- 臨時工——普通線程
- 倉庫——任務隊列
- 調(diào)度員——getTask()
getTask()是一個方法,將任務隊列中的任務調(diào)度給空閑線程,源碼分析再去了解。
映射后,形成線程池流程圖如下:
線程池的工作機制
了解了線程池設計思路,我們可以總結一下線程池的工作機制:
在線程池的編程模式下,任務是提交給整個線程池,而不是直接提交給某個線程,線程池在拿到任務后, 在內(nèi)部尋找是否有空閑的線程 ,如果有,則將任務交給某個空閑的線程。如果不存在空閑線程,即線程池中的線程數(shù)大于核心線程 corePoolSize ,則將任務添加到任務隊列中 workQueue ,如果任務隊列有界且滿了之后則會判斷線程池中的線程數(shù)是否大于最大線程數(shù) maximumPoolSize ,如果小于則會創(chuàng)建新的線程來執(zhí)行任務,否則在沒有空閑線程的情況下就會執(zhí)行決絕策略 handler 。
注意:線程池中剛開始沒有線程,當一個任務提交給線程池后,線程池會創(chuàng)建一個新線程來執(zhí)行任務。一個線程同時只能執(zhí)行一個任務,但可以同時向一個線程池提交多個任務。
線程池的參數(shù)及使用
線程池的真正實現(xiàn)類是 ThreadPoolExecutor ,類的集成關系如下:
ThreadPoolExecutor的構造方法有幾個,掌握最主要的即可,其中包含 7 個參數(shù):
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize(必需),線程池中的核心線程數(shù)。
當提交一個任務時,線程池創(chuàng)建一個新線程執(zhí)行任務,直到當前線程數(shù)等于 corePoolSize。
如果當前線程數(shù)小于 corePoolSize,此時存在 空閑線程 ,提交的任務會創(chuàng)建一個新線程來執(zhí)行該任務。
如果當前線程數(shù)等于 corePoolSize,則繼續(xù)提交的任務被保存到阻塞隊列中,等待被執(zhí)行。
如果執(zhí)行了線程池 prestartAllCoreThreads() 方法,線程池會提前創(chuàng)建并啟動所有核心線程。
maximumPoolSize(必需),線程池中允許的最大線程數(shù)。
當隊列滿了,且 已創(chuàng)建的線程數(shù)小于 maximumPoolSize ,則線程池會創(chuàng)建新的線程來執(zhí)行任務。另外,對于無界隊列,可忽略該參數(shù)。
keepAliveTime(必需),線程存活保持時間。
當線程沒有任務執(zhí)行時,繼續(xù)存活的時間。默認情況下,該參數(shù)只在線程數(shù)大于 corePoolSize 時才有用,即當非核心線程處于空閑狀態(tài)的時間超過這個時間后,該線程將被回收。將 allowCoreThreadTimeOut 參數(shù)設置為 true 后,核心線程也會被回收。
unit(必需),keepAliveTime 的時間單位。
workQueue(必需),任務隊列。
用于保存等待執(zhí)行的任務的阻塞隊列。workQueue 必須是 BlockingQueue 阻塞隊列。當線程池中的線程數(shù)超過它的 corePoolSize 的時候,線程會進入阻塞隊列進行阻塞等待。
一般來說,我們應該盡量使用有界隊列,因為使用無界隊列作為工作隊列會對線程池帶來如下影響。
當線程池中的線程數(shù)達到 corePoolSize 后,新任務將在無界隊列中等待,因此線程池中的線程數(shù)不會超過 corePoolSize。
由于 1,使用無界隊列時 maximumPoolSize 將是一個無效參數(shù)。
由于 1 和 2,使用無界隊列時 keepAliveTime 將是一個無效參數(shù)。
更重要的,使用無界 queue 可能會耗盡系統(tǒng)資源,有界隊列則有助于防止資源耗盡,同時即使使用有界隊列,也要盡量控制隊列的大小在一個合適的范圍。一般使用, ArrayBlockingQueue 、 LinkedBlockingQueue 、 SynchronousQueue 、 PriorityBlockingQueue 等。
threadFactory(可選),創(chuàng)建線程的工廠。
通過自定義的線程工廠可以給每個新建的線程設置一個具有識別度的 線程名 ,threadFactory 創(chuàng)建的線程也是采用 new Thread() 方式,threadFactory 創(chuàng)建的線程名都具有統(tǒng)一的風格: pool-m-thread-n (m 為線程池的編號,n 為線程池內(nèi)的線程編號)。
handler(可選),線程飽和策略。
當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務,必須采取一種策略處理該任務,線程池提供了 四種策略:
AbortPolicy,直接拋出異常,默認策略。
CallerRunsPolicy,用調(diào)用者所在的線程來執(zhí)行任務。
DiscardOldestPolicy,丟棄阻塞隊列中靠最前的任務,并執(zhí)行當前任務。
DiscardPolicy,直接丟棄任務。
當然也可以根據(jù)應用場景實現(xiàn) RejectedExecutionHandler 接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務。
線程池的狀態(tài)
ThreadPoolExecutor 使用 int 的高 3 位來表示線程池狀態(tài),低 29 位表示線程數(shù)量:
源碼如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3;//29 private static final int CAPACITY = (1 << COUNT_BITS) - 1;//約5億 // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
至于為什么這么設計,我覺得主要原因是為了避免額外的開銷,如果使用 2 個變量來分別表示狀態(tài)和線程數(shù)量,為了保證原子性必須進行額外的加鎖操作,而 ctl 則通過原子類就解決了該問題,在通過位運算就能得到狀態(tài)和線程數(shù)量。
提交任務
可以使用兩個方法向線程池提交任務,分別為 execute() 和 submit() 方法。
- execute(),用于提交不需要返回值的任務,所以無法判斷任務是否被線程池執(zhí)行成功。
- submit(),用于提交需要返回值的任務。線程池會返回一個 future 類型的對象,通過這個 future 對象可以判斷任務是否執(zhí)行成功,并且可以通過 future 的 get() 方法來獲取返回值, get() 方法會阻塞當前線程直到任務完成,而使用 get(long timeout,TimeUnit unit) 方法則會阻塞當前線程一段時間后立即返回,這 時候有可能任務沒有執(zhí)行完。
此外, ExecutorService 還提供了兩個提交任務的方法, invokeAny() 和 invokeAll() 。
- invokeAny(),提交所有任務,哪個任務先成功執(zhí)行完畢,返回此任務執(zhí)行結果,其它任務取消。
- invokeAll(),提交所有的任務且必須全部執(zhí)行完成。
corePoolSize 和 maximumPoolSize
測試核心線程數(shù)為 1 ,最大線程數(shù)為 2,任務隊列為 1。
@Slf4j(topic = "ayue") public class ThreadExecutorPoolTest1 { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)); for (int i = 1; i < 4; i++) { //執(zhí)行任務 executor.execute(new MyTask(i)); } } //任務 static class MyTask implements Runnable { private int taskNum; public MyTask(int num) { this.taskNum = num; } @Override public void run() { log.debug("線程名稱:{},正在執(zhí)行task:{}", Thread.currentThread().getName(), taskNum); try { //模擬其他操作 Thread.currentThread().sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("task{}執(zhí)行完畢", taskNum); } } }
輸出:
<code data-type="codeline">11:07:04.377 [pool-1-thread-2] DEBUG ayue - 線程名稱:pool-1-thread-2,正在執(zhí)行task:3</code><code data-type="codeline">11:07:04.377 [pool-1-thread-1] DEBUG ayue - 線程名稱:pool-1-thread-1,正在執(zhí)行task:1</code><code data-type="codeline">11:07:05.384 [pool-1-thread-2] DEBUG ayue - task3執(zhí)行完畢</code><code data-type="codeline">11:07:05.384 [pool-1-thread-1] DEBUG ayue - task1執(zhí)行完畢</code><code data-type="codeline">11:07:05.384 [pool-1-thread-2] DEBUG ayue - 線程名稱:pool-1-thread-2,正在執(zhí)行task:2</code><code data-type="codeline">11:07:06.397 [pool-1-thread-2] DEBUG ayue - task2執(zhí)行完畢</code>
當有 3 個線程通過線程池執(zhí)行任務時,由于核心線程只有一個,且任務隊列為 1,所以當?shù)?3 個線程到來的時候, 會重新開啟一個新的線程 pool-1-thread-2 來執(zhí)行任務。
當然,這里可能有人問核心線程會不會大于最大線程?當然不會,如果 corePoolSize > maximumPoolSize ,則程序啟動會直接報錯。
任務隊列
任務隊列是基于阻塞隊列實現(xiàn)的,即采用生產(chǎn)者消費者模式,在 Java 中需要實現(xiàn) BlockingQueue 接口。但 Java 已經(jīng)為我們提供了 7 種阻塞隊列的實現(xiàn):
1.ArrayBlockingQueue:一個由數(shù)組結構組成的有界阻塞隊列。
2.LinkedBlockingQueue: 一個由鏈表結構組成的有界阻塞隊列,在未指明容量時,容量默認為 Integer.MAX_VALUE 。
3.PriorityBlockingQueue: 一個支持優(yōu)先級排序的無界阻塞隊列,對元素沒有要求,可以實現(xiàn) Comparable 接口也可以提供 Comparator 來對隊列中的元素進行比較。跟時間沒有任何關系,僅僅是 按照優(yōu)先級取任務 。
4.DelayQueue:類似于 PriorityBlockingQueue,是二叉堆實現(xiàn)的無界優(yōu)先級阻塞隊列。要求元素都實現(xiàn) Delayed 接口,通過執(zhí)行時延從隊列中提取任務,時間沒到任務取不出來。
5.SynchronousQueue: 一個不存儲元素的阻塞隊列,消費者線程調(diào)用 take() 方法的時候就會發(fā)生阻塞,直到有一個生產(chǎn)者線程生產(chǎn)了一個元素,消費者線程就可以拿到這個元素并返回;生產(chǎn)者線程調(diào)用 put() 方法的時候也會發(fā)生阻塞,直到有一個消費者線程消費了一個元素,生產(chǎn)者才會返回。
6.LinkedBlockingDeque: 使用雙向隊列實現(xiàn)的有界雙端阻塞隊列。雙端意味著可以像普通隊列一樣 FIFO(先進先出),也可以像棧一樣 FILO(先進后出)。
7.LinkedTransferQueue: 它是 ConcurrentLinkedQueue、LinkedBlockingQueue 和 SynchronousQueue 的結合體,但是把它用在 ThreadPoolExecutor 中,和 LinkedBlockingQueue 行為一致,但是是無界的阻塞隊列。
線程工廠
線程工廠默認創(chuàng)建的線程名: pool-m-thread-n ,在 Executors.defaultThreadFactory() 可以看到:
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { //線程名:namePrefix + threadNumber.getAndIncrement() Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(),0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
我們也可以通過 ThreadPoolExecutor 自定義線程名:
@Slf4j(topic = "ayue") public class ThreadExecutorPoolTest1 { public static void main(String[] args) { //自增線程id AtomicInteger threadNumber = new AtomicInteger(1); ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "javatv-" + threadNumber.getAndIncrement()); } }); for (int i = 1; i < 4; i++) { executor.execute(new MyTask(i)); } } static class MyTask implements Runnable { private int taskNum; public MyTask(int num) { this.taskNum = num; } @Override public void run() { log.debug("線程名稱:{},正在執(zhí)行task:{}", Thread.currentThread().getName(), taskNum); try { //模擬其他操作 Thread.currentThread().sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("task{}執(zhí)行完畢", taskNum); } } }
輸出:
<code data-type="codeline">14:08:07.166 [javatv-1] DEBUG ayue - 線程名稱:javatv-1,正在執(zhí)行task:1</code><code data-type="codeline">14:08:07.166 [javatv-2] DEBUG ayue - 線程名稱:javatv-2,正在執(zhí)行task:3</code><code data-type="codeline">14:08:08.170 [javatv-1] DEBUG ayue - task1執(zhí)行完畢</code><code data-type="codeline">14:08:08.170 [javatv-2] DEBUG ayue - task3執(zhí)行完畢</code><code data-type="codeline">14:08:08.170 [javatv-1] DEBUG ayue - 線程名稱:javatv-1,正在執(zhí)行task:2</code><code data-type="codeline">14:08:09.172 [javatv-1] DEBUG ayue - task2執(zhí)行完畢</code>
拒絕策略
線程池提供了 四種策略:
1.AbortPolicy,直接拋出異常,默認策略。
2.CallerRunsPolicy,用調(diào)用者所在的線程來執(zhí)行任務。
3.DiscardOldestPolicy,丟棄阻塞隊列中靠最前的任務,并執(zhí)行當前任務。
4.DiscardPolicy,直接丟棄任務。、
把上面代碼的循環(huán)次數(shù)改為 4 次,則會拋出 java.util.concurrent.RejectedExecutionException 異常。
for (int i = 1; i < 5; i++) { executor.execute(new MyTask(i)); }
關閉線程池
可以通過調(diào)用線程池的 shutdown 或 shutdownNow 方法來關閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個調(diào)用線程的 interrupt 方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。但是它們存在一定的區(qū)別, shutdownNow 首先將線程池的狀態(tài)設置成 STOP ,然后嘗試停止所有的正在執(zhí)行或暫停任務的線程,并返回等待執(zhí)行任務的列表,而 shutdown 只是將線程池的狀態(tài)設置成 SHUTDOWN 狀態(tài),然后中斷所有沒有正在執(zhí)行任務的線程。 簡單來說:
- shutdown():線程池狀態(tài)變?yōu)?SHUTDOWN,不會接收新任務,但已提交任務會執(zhí)行完,不會阻塞調(diào)用線程的執(zhí)行 。
- shutdownNow():線程池狀態(tài)變?yōu)?STOP,會接收新任務,會將隊列中的任務返回,并用 interrupt 的方式中斷正在執(zhí)行的任務。
只要調(diào)用了這兩個關閉方法中的任意一個, isShutdown 方法就會返回 true。當所有的任務都已關閉后,才表示線程池關閉成功,這時調(diào)用 isTerminaed 方法會返回 true。至于應該調(diào)用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調(diào)用 shutdown 方法來關閉線程池,如果任務不一定要執(zhí)行完,則可以調(diào)用 shutdownNow 方法。
Executors 靜態(tài)工廠
Executors,提供了一系列靜態(tài)工廠方法用于創(chuàng)建各種類型的線程池,基于 ThreadPoolExecutor。
1.FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
特點:核心線程數(shù)等于最大線程數(shù),因此也無需超時時間,執(zhí)行完立即回收,阻塞隊列是無界的,可以放任意數(shù)量的任務。
場景:適用于任務量已知,相對耗時的任務。
2.newCachedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
可根據(jù)需要創(chuàng)建新線程的線程池,如果現(xiàn)有線程沒有可用的,則創(chuàng)建一個新線程并添加到池中,如果有被使用完但是還沒銷毀的線程,就復用該線程。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。因此,長時間保持空閑的線程池不會使用任何資源。這種線程池比較靈活, 對于執(zhí)行很多短期異步任務的程序而言,這些線程池通??商岣叱绦蛐阅?。
特點:核心線程數(shù)是 0, 最大線程數(shù)是 Integer.MAX_VALUE ,全部都是空閑線程 60s 后回收。
場景:執(zhí)行大量、耗時少的任務。
3.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
特點:單線程線程池。希望多個任務排隊執(zhí)行,線程數(shù)固定為 1,任務數(shù)多于 1 時,會放入無界隊列排隊,任務執(zhí)行完畢,這唯一的線程也不會被釋放。
場景:區(qū)別于自己創(chuàng)建一個單線程串行執(zhí)行任務,如果使用 new Thread 任務執(zhí)行失敗而終止那么沒有任何補救措施,而線程池還會新建一個線程,保證池的正常工作。
4.ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor。它主要用來在給定的延遲之后運行任務,或者定期執(zhí)行任務。ScheduledThreadPoolExecuto 的功能與 Timer 類似,但 ScheduledThreadPoolExecutor 功能更強大、更靈活。Timer 對應的是單個后臺線程,而 ScheduledThreadPoolExecutor 可以在構造函數(shù)中指定多個對應的后臺線程數(shù)。
特點:核心線程數(shù)量固定,非核心線程數(shù)量無限,執(zhí)行完閑置 10ms 后回收,任務隊列為延時阻塞隊列。
場景:執(zhí)行定時或周期性的任務。
合理地配置線程池
需要針對具體情況而具體處理,不同的任務類別應采用不同規(guī)模的線程池,任務類別可劃分為 CPU 密集型任務、IO 密集型任務和混合型任務。
- CPU 密集型任務:線程池中線程個數(shù)應盡量少,不應大于 CPU 核心數(shù);
- IO 密集型任務:由于 IO 操作速度遠低于 CPU 速度,那么在運行這類任務時,CPU 絕大多數(shù)時間處于空閑狀態(tài),那么線程池可以配置盡量多些的線程,以提高 CPU 利用率;
- 混合型任務:可以拆分為 CPU 密集型任務和 IO 密集型任務,當這兩類任務執(zhí)行時間相差無幾時,通過拆分再執(zhí)行的吞吐率高于串行執(zhí)行的吞吐率,但若這兩類任務執(zhí)行時間有數(shù)據(jù)級的差距,那么沒有拆分的意義。
線程池的監(jiān)控
如果在系統(tǒng)中大量使用線程池,則有必要對線程池進行監(jiān)控,方便在出現(xiàn)問題時,可以根據(jù)線程池的使用狀況快速定位問題。利用線程池提供的參數(shù)進行監(jiān)控,參數(shù)如下:
- taskCount:線程池需要執(zhí)行的任務數(shù)量。
- completedTaskCount:線程池在運行過程中已完成的任務數(shù)量,小于或等于 taskCount。
- largestPoolSize:線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量,通過這個數(shù)據(jù)可以知道線程池是否滿過。如等于線程池的最大大小,則表示線程池曾經(jīng)滿了。
- getPoolSize:線程池的線程數(shù)量。如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不減。
- getActiveCount:獲取活動的線程數(shù)。
通過擴展線程池進行監(jiān)控:繼承線程池并重寫線程池的 beforeExecute() , afterExecute() 和 terminated() 方法,可以在任務執(zhí)行前、后和線程池關閉前自定義行為。如監(jiān)控任務的平均執(zhí)行時間,最大執(zhí)行時間和最小執(zhí)行時間等。
源碼分析
在使用線程池的時候,我其實有一些問題也隨之而來,比如線程池的線程怎么創(chuàng)建?任務怎么執(zhí)行?任務怎么分配?線程執(zhí)行完后怎么辦?是存活還是死亡?什么時候死亡?為什么要使用阻塞隊列等等問題。帶著這些問題,我們?nèi)プx讀源碼,讀源碼怎么入手?通過 ThreadPoolExecutor 的 execute() 方法。submit 底層也是調(diào)用了 execute() 。
execute
public void execute(Runnable command) { //如果沒有任務直接拋出異常 if (command == null) throw new NullPointerException(); //獲取當前線程的狀態(tài)+線程個數(shù) int c = ctl.get(); /** * workerCountOf,線程池當前線程數(shù),并判斷是否小于核心線程數(shù) */ if (workerCountOf(c) < corePoolSize) {//如果小于 if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // 這里是向任務隊列投放任務成功,對線程池的運行中狀態(tài)做二次檢查 // 如果線程池二次檢查狀態(tài)是非運行中狀態(tài),則從任務隊列移除當前的任務調(diào)用拒絕策略處理(也就是移除前面成功入隊的任務實例) int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); /* 走到下面的else if分支,說明有以下的前提: * 1、待執(zhí)行的任務已經(jīng)成功加入任務隊列 * 2、線程池可能是RUNNING狀態(tài) * 3、傳入的任務可能從任務隊列中移除失?。ㄒ瞥〉奈ㄒ豢赡芫褪侨蝿找呀?jīng)被執(zhí)行了) * * 如果當前工作線程數(shù)量為0,則創(chuàng)建一個非核心線程并且傳入的任務對象為null - 返回 * 也就是創(chuàng)建的非核心線程不會馬上運行,而是等待獲取任務隊列的任務去執(zhí)行 * 如果前工作線程數(shù)量不為0,原來應該是最后的else分支,但是可以什么也不做, * 因為任務已經(jīng)成功入隊列,總會有合適的時機分配其他空閑線程去執(zhí)行它。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } /* 走到這里說明有以下的前提: * 1、線程池中的工作線程總數(shù)已經(jīng)大于等于corePoolSize(簡單來說就是核心線程已經(jīng)全部懶創(chuàng)建完畢) * 2、線程池可能不是RUNNING狀態(tài) * 3、線程池可能是RUNNING狀態(tài)同時任務隊列已經(jīng)滿了 * * 如果向任務隊列投放任務失敗,則會嘗試創(chuàng)建非核心線程傳入任務執(zhí)行 * 創(chuàng)建非核心線程失敗,此時需要拒絕執(zhí)行任務 */ else if (!addWorker(command, false)) reject(command); }
addWorker
第一個 if 判斷線程池當前線程數(shù)是否小于核心線程數(shù)。
if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); }
如果小于,則進入 addWorker 方法:
private boolean addWorker(Runnable firstTask, boolean core) { retry: //外層循環(huán):判斷線程池狀態(tài) for (;;) { int c = ctl.get(); //獲取線程池狀態(tài) int rs = runStateOf(c); // 檢查線程池的狀態(tài)是否存活. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //內(nèi)層循環(huán):線程池添加核心線程并返回是否添加成功的結果 for (;;) { //線程數(shù)量 int wc = workerCountOf(c); //線程數(shù)量超過容量,返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS增加線程數(shù)量,若成功跳出外層循環(huán) if (compareAndIncrementWorkerCount(c)) break retry; //否則失敗,并更新c c = ctl.get(); // Re-read ctl //如果這時的線程池狀態(tài)發(fā)生變化,重新對外層循環(huán)進行自旋 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //如果CAS成功了,則繼續(xù)往下走 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //創(chuàng)建一個Worker,這個Worker實現(xiàn)了Runable,把它看成一個任務單元 w = new Worker(firstTask); //這個Thread就是當前的任務單元Worker,即this final Thread t = w.thread; if (t != null) { //加鎖,因為可能有多個線程來調(diào)用 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 再次檢查線程池的狀態(tài),避免在獲取鎖前調(diào)用shutdown方法 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //如果t線程已經(jīng)啟動尚未終止,則拋出異常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //否則,加入線程池 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //加入線程池后,啟動該線程,上面已經(jīng)設置為true if (workerAdded) { t.start(); workerStarted = true; } } } finally { //如果線程啟動失敗,則調(diào)用addWorkerFailed,回滾操作 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker
Worker 是 ThreadPoolExecutor 的內(nèi)部類,繼承了 AQS 并且實現(xiàn)了 Runnable。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; //構造方法 Worker(Runnable firstTask) { //在調(diào)用runWorker前禁止中斷 //當其它線程調(diào)用了線程池的 shutdownNow 時候,如果 worker 狀態(tài) >= 0 則會中斷該線程 //具體方法在 interruptIfStarted() 中可以看到 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } //省略其他代碼... }
可以看到,在 Worker 的構造方法可以知道,其中的 thread 屬性就是通過 this 去創(chuàng)建的,所以線程池核心線程的創(chuàng)建主要是 run 方法中的 runWorker 方法:
runWorker
runWorker 核心線程執(zhí)行邏輯。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 調(diào)用unlock()是為了讓外部可以中斷 w.unlock(); // allow interrupts // 線程退出的原因,true是任務導致,false是線程正常退出 boolean completedAbruptly = true; try { // 1. 如果firstTask不為null,則執(zhí)行firstTask // 2. 如果firstTask為null,則調(diào)用getTask()從隊列獲取任務 // 3. 阻塞隊列的特性就是:當隊列為空時,當前線程會被阻塞等待 while (task != null || (task = getTask()) != null) { w.lock(); // 判斷線程池的狀態(tài),如果線程池正在停止,則對當前線程進行中斷操作 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt();//中斷 try { //該方法里面沒有內(nèi)容,可以自己擴展實現(xiàn),比如上面提到的線程池的監(jiān)控 beforeExecute(wt, task); Throwable thrown = null; try { //執(zhí)行具體的任務 task.run(); } catch (RuntimeException x) {//線程異常后操作 thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //同 beforeExecute() afterExecute(task, thrown); } } finally { task = null;//help gc //統(tǒng)計當前worker完成了多少個任務 w.completedTasks++; //釋放鎖 w.unlock(); } } completedAbruptly = false; } finally { // 處理線程退出,completedAbruptly為true說明由于任務異常導致線程非正常退出 processWorkerExit(w, completedAbruptly); } }
getTask
而對于其中的 getTask() 方法,任務隊列中的任務調(diào)度給空閑線程,該方法是非常重要的,為什么重要?其中就涉及到面試官常問的 線程池如何保證核心線程不會被銷毀,而空閑線程會被銷毀?
private Runnable getTask() { //判斷最新一次的poll是否超時 //poll:取走BlockingQueue里排在首位的對象 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /** * 條件1:線程池狀態(tài)SHUTDOWN、STOP、TERMINATED狀態(tài) * 條件2:線程池STOP、TERMINATED狀態(tài)或workQueue為空 * 條件1與條件2同時為true,則workerCount-1,并且返回null * 注:條件2是考慮到SHUTDOWN狀態(tài)的線程池不會接受任務,但仍會處理任務(前面也講到了) */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? /* * 該屬性的作用是判斷當前線程是否允許超時: * 1.allowCoreThreadTimeOut * 如果為 false(默認),核心線程即使在空閑時也保持活動狀態(tài)。 * 如果為 true,則核心線程使用 keepAliveTime 超時等待工作。 * 2.wc > corePoolSize * 當前線程是否已經(jīng)超過核心線程數(shù)量。 */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /* * 判斷當前線程是否可以退出: * 1.wc > maximumPoolSize || (timed && timedOut) * wc > maximumPoolSize = true,說明當前的工作線程總數(shù)大于線程池最大線程數(shù)。 * timed && timedOut = true,說明當前線程允許超時并且已經(jīng)超時。 * 2.wc > 1 || workQueue.isEmpty() * 工作線程總數(shù)大于1或者任務隊列為空,則通過CAS把線程數(shù)減去1,同時返回null */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { /* * 1.poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象, * 如果在指定時間內(nèi),隊列一旦有數(shù)據(jù)可取,則立即返回隊列中的數(shù)據(jù)。否則直到時間超時還沒有數(shù)據(jù)可取,返回失敗。 * * 2.take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態(tài)直到BlockingQueue有新的數(shù)據(jù)被加入。 * * * 如果timed為true,通過poll()方法做超時拉取,keepAliveTime時間內(nèi)沒有等待到有效的任務,則返回null。 * * 如果timed為false,通過take()做阻塞拉取,會阻塞到有下一個有效的任務時候再返回(一般不會是null)。 */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //通過poll()方法從任務隊列中拉取任務為null timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
① 對于 getTask() 下面的這段代碼,這段邏輯大多數(shù)情況下是針對非核心線程:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }
② 我們這樣來閱讀這段代碼,當工作線程數(shù)大于核心線程 corePoolSize ,此時進入 execute() 方法中的第二個 if 語句:
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }
此時線程池總數(shù)已經(jīng)超過了 corePoolSize 但小于 maximumPoolSize ,當任務隊列已經(jīng)滿了的時候,會通過 addWorker(task,false) 添加非核心線程。
而在高并發(fā)的情況下,肯定會產(chǎn)生多余的線程,也就是出現(xiàn) ① 中的情況 wc > maximumPoolSize ,而這些多余的線程怎么辦,是不是會被回收?如果 workQueue.poll 沒有獲取到有效的任務,那么①中的邏輯剛好與 addWorker(task,false) 相反,通過 CAS 減少非核心線程,使得工作線程總數(shù)趨向于 corePoolSize 。
如果對于非核心線程,上一輪循環(huán)獲取任務對象為 null ,在默認情況下 allowCoreThreadTimeOut = false ,因此, getTask() 中 timed = true ,如果沒有獲取到任務,此時 timedOut = true ,這一輪循環(huán)很容易滿足 timed && timedOut 為 true,這個時候 getTask() 返回 null 會導致 Worker#runWorker() 方法跳出死循環(huán),之后執(zhí)行 processWorkerExit() 方法處理后續(xù)工作,而該非核心線程對應的 Worker 則變成 游離對象 ,等待被 JVM 回收。
當 allowCoreThreadTimeOut 設置為 true 的時候,這里分析的非核心線程的生命周期終結邏輯同時會適用于核心線程。
由此推出一個面試題: 線程池有多個線程同時沒取到任務,會全部回收嗎?
舉個例子:線程池核心線程數(shù)是 5,最大線程數(shù)為 5,當前工作線程數(shù)為 6(6>5,意味著當前可以觸發(fā)線程回收),如果此時有 3 個線程同時超時沒有獲取到任務,這 3 個線程會都被回收銷毀嗎?
思路:這道題的核心點在于有多個線程同時超時獲取不到任務。正常情況下,此時會觸發(fā)線程回收的流程。但是我們知道,正常不設置 allowCoreThreadTimeOut 變量時,線程池即使沒有任務處理,也會保持核心線程數(shù)的線程。如果這邊 3 個線程被全部回收,那此時線程數(shù)就變成了 3 個,不符合核心線程數(shù) 5 個,所以這邊我們可以首先得出答案:不會被全部回收。這個時候面試官肯定會問為什么?
根據(jù)答案不難推測,為了防止本題的這種并發(fā)回收問題的出現(xiàn),線程回收的流程必然會有并發(fā)控制。compareAndDecrementWorkerCount(c) 用的是 CAS 方法,如果 CAS 失敗就 continue,進入下一輪循環(huán),重新判斷。
像上述例子,其中一條線程會 CAS 失敗,然后重新進入循環(huán),發(fā)現(xiàn)工作線程數(shù)已經(jīng)只有 5 了, timed = false , 這條線程就不會被銷毀,可以一直阻塞了,此時就會調(diào)用 workQueue.take() 阻塞等待下一次的任務,也就是說核心線程并不會死亡。
從這里也可以看出,雖然有核心線程數(shù),但線程并沒有區(qū)分是核心還是非核心,并不是先創(chuàng)建的就是核心,超過核心線程數(shù)后創(chuàng)建的就是非核心,最終保留哪些線程,完全隨機。
然后可以回答出前面的問題,線程池如何保證核心線程不會被銷毀,而空閑線程會被銷毀?
核心線程是因為調(diào)用了阻塞方法而不會被銷毀,空閑線程調(diào)用了超時方法在下次執(zhí)行時獲取不到任務而死亡。
這樣回答其實是可以的,但是這可能顯示出你是背得八股文,所以你應該回答核心線程不僅僅是因為調(diào)用了阻塞方法而不會被銷毀,同時利用了 CAS 來保證。
還可以得出 getTask() 返回 null 的情況 :
1.線程池的狀態(tài)已經(jīng)為 STOP,TIDYING, TERMINATED,或者是 SHUTDOWN 且工作隊列為空。
2.工作線程數(shù)大于最大線程數(shù)或當前工作線程已超時,且,其存在工作線程或任務隊列為空。
runWorker 的流程:
processWorkerExit
在 runWorker 的 finally 塊中,當任務執(zhí)行之后,要對其做處理,作線程在執(zhí)行完 processWorkerExit() 方法才算真正的終結,該方法如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 因為拋出用戶異常導致線程終結,直接使工作線程數(shù)減1即可 // 如果沒有任何異常拋出的情況下是通過getTask()返回null引導線程正常跳出runWorker()方法的while死循環(huán)從而正常終結,這種情況下,在getTask()中已經(jīng)把線程數(shù)減1 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 全局的已完成任務記錄數(shù)加上此將要終結的Worker中的已完成任務數(shù) completedTaskCount += w.completedTasks; // 工作線程集合中移除此將要終結的Worker workers.remove(w); } finally { mainLock.unlock(); } // 見下一小節(jié)分析,用于根據(jù)當前線程池的狀態(tài)判斷是否需要進行線程池terminate處理 tryTerminate(); int c = ctl.get(); // 如果線程池的狀態(tài)小于STOP,也就是處于RUNNING或者SHUTDOWN狀態(tài)的前提下: // 1.如果線程不是由于拋出用戶異常終結,如果允許核心線程超時,則保持線程池中至少存在一個工作線程 // 2.如果線程由于拋出用戶異常終結,或者當前工作線程數(shù),那么直接添加一個新的非核心線程 if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { // 如果允許核心線程超時,最小值為0,否則為corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果最小值為0,同時任務隊列不空,則更新最小值為1 if (min == 0 && ! workQueue.isEmpty()) min = 1; // 工作線程數(shù)大于等于最小值,直接返回不新增非核心線程 if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
代碼的后面部分區(qū)域,會判斷線程池的狀態(tài),如果線程池是 RUNNING 或者 SHUTDOWN 狀態(tài)的前提下,如果當前的工作線程由于拋出異常被終結,那么會新創(chuàng)建一個非核心線程。如果當前的工作線程并不是拋出用戶異常被終結(正常情況下的終結),那么會這樣處理:
allowCoreThreadTimeOut 為 true,也就是允許核心線程超時的前提下,如果任務隊列空,則會通過創(chuàng)建一個非核心線程保持線程池中至少有一個工作線程。
allowCoreThreadTimeOut 為 false,如果工作線程總數(shù)大于 corePoolSize 則直接返回,否則創(chuàng)建一個非核心線程,也就是會趨向于保持線程池中的工作線程數(shù)量趨向于 corePoolSize 。
processWorkerExit() 執(zhí)行完畢之后,意味著該工作線程的生命周期已經(jīng)完結。
面試題
1、線程池的線程怎么創(chuàng)建?任務怎么執(zhí)行?
主要在于 Worker 類以及 Worker#runWorker() 方法。
2、任務怎么分配?
參考 getTask() 方法。
3、線程池如何保證核心線程不會被銷毀,而空閑線程會被銷毀?
參考上文。
4、線程池有多個線程同時沒取到任務,會全部回收嗎?
參考上文。
5、為什么要使用阻塞隊列?
線程池是采用生產(chǎn)者-消費者模式設計的。線程池為消費者。
在線程池中活躍線程數(shù)達到 corePoolSize 時,線程池將會將后續(xù)的任務提交到 BlockingQueue 中, (每個 task 都是單獨的生產(chǎn)者線程)進入到堵塞對列中的 task 線程會 wait() 從而釋放 cpu,從而提高 cpu 利用率。?
以上就是深入了解Java線程池:從設計思想到源碼解讀的詳細內(nèi)容,更多關于Java線程池的資料請關注腳本之家其它相關文章!
相關文章
java ConcurrentHashMap鎖分段技術及原理詳解
這篇文章主要介紹了java ConcurrentHashMap鎖分段技術詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-07-07使用jvisualvm配合Visual GC插件監(jiān)控Java程序詳細總結
本節(jié)將會介紹一下jvisualvm的特性及作用、各個功能是如何使用的、最后會介紹jvisualvm的插件Visual GC的安裝及使用2021-09-09Java函數(shù)式編程(十二):監(jiān)控文件修改
這篇文章主要介紹了Java函數(shù)式編程(十二):監(jiān)控文件修改,本文是系列文章的第12篇,其它文章請參閱本文底部的相關文章,需要的朋友可以參考下2014-09-09Java BufferWriter寫文件寫不進去或缺失數(shù)據(jù)的解決
這篇文章主要介紹了Java BufferWriter寫文件寫不進去或缺失數(shù)據(jù)的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07