超詳細講解Java線程池
帶著問題閱讀
1、什么是池化,池化能帶來什么好處
2、如何設(shè)計一個資源池
3、Java的線程池如何使用,Java提供了哪些內(nèi)置線程池
4、線程池使用有哪些注意事項
池化技術(shù)
池化思想介紹
池化思想是將重量級資源預(yù)先準備好,在使用時可重復(fù)使用這些預(yù)先準備好的資源。
池化思想的核心概念有:
- 資源創(chuàng)建/銷毀開銷大
- 提前創(chuàng)建,集中管理
- 重復(fù)利用,資源可回收
例如大街上的共享單車,用戶掃碼開鎖,使用完后歸還到停放點,下一個用戶可以繼續(xù)使用,共享單車由廠商統(tǒng)一管理,為用戶節(jié)省了購買單車的開銷。
池化技術(shù)的應(yīng)用
常見的池化技術(shù)應(yīng)用有:資源池、連接池、線程池等。
- 資源池
在各種電商平臺大促活動時,平臺需要支撐平時幾十倍的流量,因此各大平臺在需要提前準備大量服務(wù)器進行擴容,在活動完畢以后,擴容的服務(wù)器資源又白白浪費。將計算資源池化,在業(yè)務(wù)高峰前進行分配,高峰結(jié)束后提供給其他業(yè)務(wù)或用戶使用,即可節(jié)省大量消耗,資源池化也是云計算的核心技術(shù)之一。
- 連接池
網(wǎng)絡(luò)連接的建立和釋放也是一個開銷較大的過程,提前在服務(wù)器之間建立好連接,在需要使用的時候從連接池中獲取,使用完畢后歸還連接池,以供其他請求使用,以此可節(jié)省掉大量的網(wǎng)絡(luò)連接時間,如數(shù)據(jù)庫連接池、HttpClient連接池。
- 線程池
線程的建立銷毀都涉及到內(nèi)核態(tài)切換,提前創(chuàng)建若干數(shù)量的線程提供給客戶端復(fù)用,可節(jié)約大量的CPU消耗以便處理業(yè)務(wù)邏輯。線程池也是接下來重點要講的內(nèi)容。
如何設(shè)計一個線程池
設(shè)計一個線程池,至少需要提供的核心能力有:
- 線程池容器:用于容納初始化時預(yù)先創(chuàng)建的線程。
- 線程狀態(tài)管理:管理池內(nèi)線程的生命周期,記錄每個線程當前的可服務(wù)狀態(tài)。
- 線程請求管理:對調(diào)用端提供獲取和歸還線程的接口。
- 線程耗盡策略:提供策略以處理線程耗盡問題,如拒絕服務(wù)、擴容線程池、排隊等待等。
基于以上角度,我們來分析Java是如何設(shè)計線程池功能的。
Java線程池解析
ThreadPoolExecutor使用介紹
大象裝冰箱總共分幾步
// 1.創(chuàng)建線程池
ThreadPoolExecutor threadPool =
new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
// 2.提交任務(wù)
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println("task running");
}
}});
// 3.關(guān)閉線程池
threadPool.shutDown();
Java通過ThreadPoolExecutor提供線程池的實現(xiàn),如示例代碼,初始化一個容量為1的線程池、然后提交任務(wù)、最后關(guān)閉線程池。
ThreadPoolExecutor的核心方法主要有
- 構(gòu)造函數(shù):
ThreadPoolExecutor提供了多個構(gòu)造函數(shù),以下對基礎(chǔ)構(gòu)造函數(shù)進行說明。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:線程池的核心線程數(shù)。池內(nèi)線程數(shù)小于
corePoolSize時,線程池會創(chuàng)建新線程執(zhí)行任務(wù)。 - maximumPoolSize:線程池的最大線程數(shù)。池內(nèi)線程數(shù)大于
corePoolSize且workQueue任務(wù)等待隊列已滿時,線程池會創(chuàng)建新線程執(zhí)行隊列中的任務(wù),直到線程數(shù)達到maximumPoolSize為止。 - keepAliveTime:非核心線程的存活時長。池內(nèi)超過
corePoolSize數(shù)量的線程可存活的時長。 - unit:非核心線程存活時長單位。與
keepAliveTime取值配合,如示例代碼表示1分鐘。 - workQueue:任務(wù)提交隊列。當無空閑核心線程時,存儲待執(zhí)行任務(wù)。
| 類型 | 作用 |
|---|---|
| ArrayBlockingQueue | 數(shù)組結(jié)構(gòu)的有界阻塞隊列 |
| LinkedBlockingQueue | 鏈表結(jié)構(gòu)的阻塞隊列,可設(shè)定是否有界 |
| SynchronousQueue | 不存儲元素的阻塞隊列,直接將任務(wù)提交給線程池執(zhí)行 |
| PriorityBlockingQueue | 支持優(yōu)先級的無界阻塞隊列 |
| DelayQueue | 支持延時執(zhí)行的無界阻塞隊列 |
- threadFactory:線程工廠。用于創(chuàng)建線程對象。
- handler:拒絕策略。線程池線程數(shù)量達到
maximumPoolSize且workQueue已滿時的處理策略。
| 類型 | 作用 |
|---|---|
| AbortPolicy | 拒絕并拋出異常。默認 |
| CallerRunsPolicy | 由提交任務(wù)的線程執(zhí)行任務(wù) |
| DiscardOldestPolicy | 拋棄隊列頭部任務(wù) |
| DiscardPolicy | 拋棄該任務(wù) |
- 執(zhí)行函數(shù):
execute和submit,主要分別用于執(zhí)行Runnable和Callable。
// 提交Runnable void execute(Runnable command); // 提交Callable并返回Future <T> Future<T> submit(Callable<T> task); // 提交Runnable,執(zhí)行結(jié)束后Future.get會返回result <T> Future<T> submit(Runnable task, T result); // 提交Runnable,執(zhí)行結(jié)束后Future.get會返回null Future<?> submit(Runnable task);
- 停止函數(shù):
shutDown和shutDownNow。
// 不再接收新任務(wù),等待剩余任務(wù)執(zhí)行完畢后停止線程池 void shutdown(); // 不再接收新任務(wù),并嘗試中斷執(zhí)行中的任務(wù),返回還在等待隊列中的任務(wù)列表 List<Runnable> shutdownNow();
內(nèi)置線程池使用
To be useful across a wide range of contexts, this class provieds many adjustable parameters and extensibility hooks. However, programmers are urged to use the more convenient {@link Executors} factory methods {@link Executors#newCachedThreadPool} (unbounded thread poll, with automatic thread reclamation), {@link Executors#newFixedThreadPool} (fixed size thread pool) and {@link Executors#newSingleThreadExecutor}(single background thread), that preconfigure settings for the most common usage scenarios.
由于ThreadPoolExecutor參數(shù)復(fù)雜,Java提供了三種內(nèi)置線程池newCachedThreadPool、newFixedThreadPool和newSingleThreadExecutor應(yīng)對大多數(shù)場景。
Executors.newCachedThreadPool()無界線程池,核心線程池大小為0,最大為Integer.MAX_VALUE,因此嚴格來講并不算無界。采用SynchronousQueue作workQueue,意味著任務(wù)不會被阻塞保存在隊列,而是直接遞交到線程池,如線程池無可用線程,則創(chuàng)建新線程執(zhí)行。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Executors.newFixedThreadPool(int nThreads)固定大小線程池,其中coreSize和maxSize相等,且過期時間為0,表示經(jīng)過一定數(shù)量任務(wù)提交后,線程池將始終維持在nThreads數(shù)量大小,不會新增也不會回收線程。
public static ExecutorService new FixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Executors.newSingleThreadExecutor()單線程池,參數(shù)與fixedThreadPool類似,只是將數(shù)量限制在1,單線程池主要避免重復(fù)創(chuàng)建銷毀線程對象,也可用于串行化執(zhí)行任務(wù)。不同與其他線程池,單線程池采用FinallizableDelegatedExecutorService對ThreadPoolExecutor對象進行包裝,感興趣的同學可以看下源碼,其方法實現(xiàn)僅僅是對被包裝對象方法的直接調(diào)用。包裝對象主要用于避免用戶將線程池強制轉(zhuǎn)換為ThreadPoolExecutor來修改線程池大小。
public static ExecutorService newSingleThreadExecutor() {
return new FinallizableDelegatedExecutorService(
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockQueue<Runnable>()))
);
}
ThreadPoolExecutor解析
整體設(shè)計

ThreadPoolExecutor基于ExecutorService接口實現(xiàn)提交任務(wù),未采取常規(guī)資源池獲取/歸還資源的形式,整個線程池和線程的生命周期都由ThreadPoolExecutor進行管理,線程對象不對外暴露;ThreadPoolExecutor的任務(wù)管理機制類似于生產(chǎn)者消費者模型,其內(nèi)部維護一個任務(wù)隊列和消費者,一般情況下,任務(wù)被提交到隊列中,消費線程從隊列中拉取任務(wù)并將其執(zhí)行。
線程池生命周期
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static int runStateOf(int c) { return c & ~CAPACITY; } //計算當前運行狀態(tài)
private static int workerCountOf(int c) { return c & CAPACITY; } //計算當前線程數(shù)量
private static int ctlOf(int rs, int wc) { return rs | wc; } //通過狀態(tài)和線程數(shù)生成ctl
TreadPoolExecutor通過ctl維護線程池的狀態(tài)和線程數(shù)量,其中高3位存儲運行狀態(tài),低29位存儲線程數(shù)量。
線程池設(shè)定了RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED五種狀態(tài),其轉(zhuǎn)移圖如下:

在這5種狀態(tài)中,只有RUNNING時線程池可接收新任務(wù),其余4種狀態(tài)在調(diào)用shutDown或shutDownNow后觸發(fā)轉(zhuǎn)換,且在這4種狀態(tài)時,線程池均不再接收新任務(wù)。
任務(wù)管理解析
// 用于存放提交任務(wù)的隊列 private final BlockingQueue<Runnable> workQueue; // 用于保存池內(nèi)的工作線程,Java將Thread包裝成Worker存儲 private final HashSet<Worder> workers = new HashSet<Worker>();
ThreadPoolExecutor主要通過workQueue和workers兩個字段用于管理和執(zhí)行任務(wù)。

線程池任務(wù)執(zhí)行流程如圖,結(jié)合ThreadPoolExecutor.execute源碼,對任務(wù)執(zhí)行流程進行說明:
- 當任務(wù)提交到線程池時,如果當前線程數(shù)量小于核心線程數(shù),則會將為該任務(wù)直接創(chuàng)建一個
worker并將任務(wù)交由worker執(zhí)行。
if (workerCountOf(c) < corePoolSize) {
// 創(chuàng)建新worker執(zhí)行任務(wù),true表示核心線程
if (addWorker(command, true))
return;
c = ctl.get();
}
- 當已經(jīng)達到核心線程數(shù)后,任務(wù)會提交到隊列保存;
// 放入workQueue隊列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 這里采用double check再次檢測線程池狀態(tài)
if (! isRunning(recheck) && remove(command))
reject(command);
// 避免加入隊列后,所有worker都已被回收無可用線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
- 如果隊列已滿,則依據(jù)最大線程數(shù)量創(chuàng)建新
worker執(zhí)行。如果新增worker失敗,則依據(jù)設(shè)定策略拒絕任務(wù)。
// 接上,放入隊列失敗
// 添加新worker執(zhí)行任務(wù),false表示非核心線程
else if (!addWorker(command, false))
// 如添加失敗,執(zhí)行拒絕策略
reject(command);
woker對象
ThreadPoolExecutor沒有直接使用Thread記錄線程,而是定義了worker用于包裝線程對象。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
...
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// worker對象被創(chuàng)建后就會執(zhí)行
public void run() {
runWorker(this);
}
}
worker對象通過addWorker方法創(chuàng)建,一般會為其指定一個初始任務(wù)firstTask,當worker執(zhí)行完畢以后,worker會從阻塞隊列中讀取任務(wù),如果沒有任務(wù),則該worker會陷入阻塞狀態(tài)給出worker的核心邏輯代碼:
private boolean addWorker(Runnable firstTask, boolean core) {
...
// 指定firstTask,可能為null
w = new Worker(firstTask);
...
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
}
...
// 執(zhí)行新添加的worker
if (workerAdded) {
t.start();
workerStarted = true;
}
}
final void runWorker(Worker w) {
// 等待workQueue的任務(wù)
while (task != null || (task = getTask()) != null) {
...
}
}
private Runnable getTask() {
...
for (;;) {
...
// 如果是普通工作線程,則根據(jù)線程存活時間讀取阻塞隊列
// 如果是核心工作線程,則直接陷入阻塞狀態(tài),等待workQueue獲取任務(wù)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
...
}
}
如下圖,任務(wù)提交后觸發(fā)addWorker創(chuàng)建worker對象,該對象執(zhí)行任務(wù)完畢后,則循環(huán)獲取隊列中任務(wù)等待執(zhí)行。

Java線程池實踐建議
不建議使用Exectuors
線程池不允許使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風險?!栋⒗锇桶烷_發(fā)手冊》
雖然Java推薦開發(fā)者直接使用Executors提供的線程池,但實際開發(fā)中通常不使用。主要考慮問題有:
- 潛在的OOM問題
CachedThreadPool將最大數(shù)量設(shè)置為Integer.MAX_VALUE,如果一直提交任務(wù),可能造成Thread對象過多引起OOM。FixedThreadPool和SingleThreadPoo的隊列LinkedBlockingQueue無容量限制,阻塞任務(wù)過多也可能造成OOM。
- 線程問題定位不便
由于未指定ThreadFactory,線程名稱默認為pool-poolNumber-thread-thredNumber,線程出現(xiàn)問題后不便定位具體線程池。
- 線程池分散
通常在完善的項目中,由于線程是重量資源,因此線程池由統(tǒng)一模塊管理,重復(fù)創(chuàng)建線程池容易造成資源分散,難以管理。
線程池大小設(shè)置
通常按照IO繁忙型和CPU繁忙型任務(wù)分別采用以下兩個普遍公式。

在理論場景中,如一個任務(wù)IO耗時40ms,CPU耗時10ms,那么在IO處理期間,CPU是空閑的,此時還可以處理4個任務(wù)(40/10),因此理論上可以按照IO和CPU的時間消耗比設(shè)定線程池大小。

《JAVA并發(fā)編程實踐》中還考慮數(shù)量乘以目標CPU的利用率
在實際場景中,我們通常無法準確測算IO和CPU的耗時占比,并且隨著流量變化,任務(wù)的耗時占比也不能固定。因此可根據(jù)業(yè)務(wù)需求,開設(shè)線程池運維接口,根據(jù)線上指標動態(tài)調(diào)整線程池參數(shù)。
推薦參考第二篇美團線程池應(yīng)用
線程池監(jiān)控
ThreadPoolExecutor提供以下方法監(jiān)控線程池:
getTaskCount()返回被調(diào)度過的任務(wù)數(shù)量getCompletedTaskCount()返回完成的任務(wù)數(shù)量getPoolSize()返回當前線程池線程數(shù)量getActiveCount()返回活躍線程數(shù)量getQueue()獲取隊列,一般用于監(jiān)控阻塞任務(wù)數(shù)量和隊列空間大小
到此這篇關(guān)于超詳細講解Java線程池的文章就介紹到這了,更多相關(guān)Java 線程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
基于springboot創(chuàng)建mybatis的完整步驟
MyBatis是一款優(yōu)秀的數(shù)據(jù)庫持久層框架,相比Hibernate我更喜歡使用MyBatis,看的到SQL還是讓人更安心點,這篇文章主要給大家介紹了關(guān)于基于springboot創(chuàng)建mybatis的完整步驟,需要的朋友可以參考下2024-03-03
Spring中@Configuration注解的Full模式和Lite模式詳解
這篇文章主要介紹了Spring中@Configuration注解的Full模式和Lite模式詳解,準確來說,Full?模式和?Lite?模式其實?Spring?容器在處理?Bean?時的兩種不同行為,這兩種不同的模式在使用時候的表現(xiàn)完全不同,今天就來和各位小伙伴捋一捋這兩種模式,需要的朋友可以參考下2023-09-09
使用jpa的實體對象轉(zhuǎn)json符串時懶加載的問題及解決
這篇文章主要介紹了使用jpa的實體對象轉(zhuǎn)json符串時懶加載的問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02
SpringBoot優(yōu)化接口響應(yīng)時間的九個技巧
在實際開發(fā)中,提升接口響應(yīng)速度是一件挺重要的事,特別是在面臨大量用戶請求的時候,本文為大家整理了9個SpringBoot優(yōu)化接口響應(yīng)時間的技巧,希望對大家有所幫助2024-01-01

