超詳細(xì)講解Java線程池
帶著問題閱讀
1、什么是池化,池化能帶來(lái)什么好處
2、如何設(shè)計(jì)一個(gè)資源池
3、Java的線程池如何使用,Java提供了哪些內(nèi)置線程池
4、線程池使用有哪些注意事項(xiàng)
池化技術(shù)
池化思想介紹
池化思想是將重量級(jí)資源預(yù)先準(zhǔn)備好,在使用時(shí)可重復(fù)使用這些預(yù)先準(zhǔn)備好的資源。
池化思想的核心概念有:
- 資源創(chuàng)建/銷毀開銷大
- 提前創(chuàng)建,集中管理
- 重復(fù)利用,資源可回收
例如大街上的共享單車,用戶掃碼開鎖,使用完后歸還到停放點(diǎn),下一個(gè)用戶可以繼續(xù)使用,共享單車由廠商統(tǒng)一管理,為用戶節(jié)省了購(gòu)買單車的開銷。
池化技術(shù)的應(yīng)用
常見的池化技術(shù)應(yīng)用有:資源池、連接池、線程池等。
- 資源池
在各種電商平臺(tái)大促活動(dòng)時(shí),平臺(tái)需要支撐平時(shí)幾十倍的流量,因此各大平臺(tái)在需要提前準(zhǔn)備大量服務(wù)器進(jìn)行擴(kuò)容,在活動(dòng)完畢以后,擴(kuò)容的服務(wù)器資源又白白浪費(fèi)。將計(jì)算資源池化,在業(yè)務(wù)高峰前進(jìn)行分配,高峰結(jié)束后提供給其他業(yè)務(wù)或用戶使用,即可節(jié)省大量消耗,資源池化也是云計(jì)算的核心技術(shù)之一。
- 連接池
網(wǎng)絡(luò)連接的建立和釋放也是一個(gè)開銷較大的過程,提前在服務(wù)器之間建立好連接,在需要使用的時(shí)候從連接池中獲取,使用完畢后歸還連接池,以供其他請(qǐng)求使用,以此可節(jié)省掉大量的網(wǎng)絡(luò)連接時(shí)間,如數(shù)據(jù)庫(kù)連接池、HttpClient連接池。
- 線程池
線程的建立銷毀都涉及到內(nèi)核態(tài)切換,提前創(chuàng)建若干數(shù)量的線程提供給客戶端復(fù)用,可節(jié)約大量的CPU消耗以便處理業(yè)務(wù)邏輯。線程池也是接下來(lái)重點(diǎn)要講的內(nèi)容。
如何設(shè)計(jì)一個(gè)線程池
設(shè)計(jì)一個(gè)線程池,至少需要提供的核心能力有:
- 線程池容器:用于容納初始化時(shí)預(yù)先創(chuàng)建的線程。
- 線程狀態(tài)管理:管理池內(nèi)線程的生命周期,記錄每個(gè)線程當(dāng)前的可服務(wù)狀態(tài)。
- 線程請(qǐng)求管理:對(duì)調(diào)用端提供獲取和歸還線程的接口。
- 線程耗盡策略:提供策略以處理線程耗盡問題,如拒絕服務(wù)、擴(kuò)容線程池、排隊(duì)等待等。
基于以上角度,我們來(lái)分析Java是如何設(shè)計(jì)線程池功能的。
Java線程池解析
ThreadPoolExecutor使用介紹
大象裝冰箱總共分幾步
// 1.創(chuàng)建線程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<>()); // 2.提交任務(wù) threadPool.execute(new Runnable() { @Override public void run() { System.out.println("task running"); } }}); // 3.關(guān)閉線程池 threadPool.shutDown();
Java通過ThreadPoolExecutor
提供線程池的實(shí)現(xiàn),如示例代碼,初始化一個(gè)容量為1的線程池、然后提交任務(wù)、最后關(guān)閉線程池。
ThreadPoolExecutor
的核心方法主要有
- 構(gòu)造函數(shù):
ThreadPoolExecutor
提供了多個(gè)構(gòu)造函數(shù),以下對(duì)基礎(chǔ)構(gòu)造函數(shù)進(jìn)行說明。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
- corePoolSize:線程池的核心線程數(shù)。池內(nèi)線程數(shù)小于
corePoolSize
時(shí),線程池會(huì)創(chuàng)建新線程執(zhí)行任務(wù)。 - maximumPoolSize:線程池的最大線程數(shù)。池內(nèi)線程數(shù)大于
corePoolSize
且workQueue
任務(wù)等待隊(duì)列已滿時(shí),線程池會(huì)創(chuàng)建新線程執(zhí)行隊(duì)列中的任務(wù),直到線程數(shù)達(dá)到maximumPoolSize
為止。 - keepAliveTime:非核心線程的存活時(shí)長(zhǎng)。池內(nèi)超過
corePoolSize
數(shù)量的線程可存活的時(shí)長(zhǎng)。 - unit:非核心線程存活時(shí)長(zhǎng)單位。與
keepAliveTime
取值配合,如示例代碼表示1分鐘。 - workQueue:任務(wù)提交隊(duì)列。當(dāng)無(wú)空閑核心線程時(shí),存儲(chǔ)待執(zhí)行任務(wù)。
類型 | 作用 |
---|---|
ArrayBlockingQueue | 數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列 |
LinkedBlockingQueue | 鏈表結(jié)構(gòu)的阻塞隊(duì)列,可設(shè)定是否有界 |
SynchronousQueue | 不存儲(chǔ)元素的阻塞隊(duì)列,直接將任務(wù)提交給線程池執(zhí)行 |
PriorityBlockingQueue | 支持優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列 |
DelayQueue | 支持延時(shí)執(zhí)行的無(wú)界阻塞隊(duì)列 |
- threadFactory:線程工廠。用于創(chuàng)建線程對(duì)象。
- handler:拒絕策略。線程池線程數(shù)量達(dá)到
maximumPoolSize
且workQueue
已滿時(shí)的處理策略。
類型 | 作用 |
---|---|
AbortPolicy | 拒絕并拋出異常。默認(rèn) |
CallerRunsPolicy | 由提交任務(wù)的線程執(zhí)行任務(wù) |
DiscardOldestPolicy | 拋棄隊(duì)列頭部任務(wù) |
DiscardPolicy | 拋棄該任務(wù) |
- 執(zhí)行函數(shù):
execute
和submit
,主要分別用于執(zhí)行Runnable
和Callable
。
// 提交Runnable void execute(Runnable command); // 提交Callable并返回Future <T> Future<T> submit(Callable<T> task); // 提交Runnable,執(zhí)行結(jié)束后Future.get會(huì)返回result <T> Future<T> submit(Runnable task, T result); // 提交Runnable,執(zhí)行結(jié)束后Future.get會(huì)返回null Future<?> submit(Runnable task);
- 停止函數(shù):
shutDown
和shutDownNow
。
// 不再接收新任務(wù),等待剩余任務(wù)執(zhí)行完畢后停止線程池 void shutdown(); // 不再接收新任務(wù),并嘗試中斷執(zhí)行中的任務(wù),返回還在等待隊(duì)列中的任務(wù)列表 List<Runnable> shutdownNow();
內(nèi)置線程池使用
To be useful across a wide range of contexts, this class provieds many adjustable parameters and extensibility hooks. However, programmers are urged to use the more convenient {@link Executors} factory methods {@link Executors#newCachedThreadPool} (unbounded thread poll, with automatic thread reclamation), {@link Executors#newFixedThreadPool} (fixed size thread pool) and {@link Executors#newSingleThreadExecutor}(single background thread), that preconfigure settings for the most common usage scenarios.
由于ThreadPoolExecutor
參數(shù)復(fù)雜,Java提供了三種內(nèi)置線程池newCachedThreadPool
、newFixedThreadPool
和newSingleThreadExecutor
應(yīng)對(duì)大多數(shù)場(chǎng)景。
Executors.newCachedThreadPool()
無(wú)界線程池,核心線程池大小為0,最大為Integer.MAX_VALUE
,因此嚴(yán)格來(lái)講并不算無(wú)界。采用SynchronousQueue
作workQueue
,意味著任務(wù)不會(huì)被阻塞保存在隊(duì)列,而是直接遞交到線程池,如線程池?zé)o可用線程,則創(chuàng)建新線程執(zhí)行。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
Executors.newFixedThreadPool(int nThreads)
固定大小線程池,其中coreSize
和maxSize
相等,且過期時(shí)間為0,表示經(jīng)過一定數(shù)量任務(wù)提交后,線程池將始終維持在nThreads
數(shù)量大小,不會(huì)新增也不會(huì)回收線程。
public static ExecutorService new FixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
Executors.newSingleThreadExecutor()
單線程池,參數(shù)與fixedThreadPool
類似,只是將數(shù)量限制在1,單線程池主要避免重復(fù)創(chuàng)建銷毀線程對(duì)象,也可用于串行化執(zhí)行任務(wù)。不同與其他線程池,單線程池采用FinallizableDelegatedExecutorService
對(duì)ThreadPoolExecutor
對(duì)象進(jìn)行包裝,感興趣的同學(xué)可以看下源碼,其方法實(shí)現(xiàn)僅僅是對(duì)被包裝對(duì)象方法的直接調(diào)用。包裝對(duì)象主要用于避免用戶將線程池強(qiáng)制轉(zhuǎn)換為ThreadPoolExecutor
來(lái)修改線程池大小。
public static ExecutorService newSingleThreadExecutor() { return new FinallizableDelegatedExecutorService( (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockQueue<Runnable>())) ); }
ThreadPoolExecutor解析
整體設(shè)計(jì)
ThreadPoolExecutor
基于ExecutorService
接口實(shí)現(xiàn)提交任務(wù),未采取常規(guī)資源池獲取/歸還資源的形式,整個(gè)線程池和線程的生命周期都由ThreadPoolExecutor
進(jìn)行管理,線程對(duì)象不對(duì)外暴露;ThreadPoolExecutor
的任務(wù)管理機(jī)制類似于生產(chǎn)者消費(fèi)者模型,其內(nèi)部維護(hù)一個(gè)任務(wù)隊(duì)列和消費(fèi)者,一般情況下,任務(wù)被提交到隊(duì)列中,消費(fèi)線程從隊(duì)列中拉取任務(wù)并將其執(zhí)行。
線程池生命周期
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static int runStateOf(int c) { return c & ~CAPACITY; } //計(jì)算當(dāng)前運(yùn)行狀態(tài) private static int workerCountOf(int c) { return c & CAPACITY; } //計(jì)算當(dāng)前線程數(shù)量 private static int ctlOf(int rs, int wc) { return rs | wc; } //通過狀態(tài)和線程數(shù)生成ctl
TreadPoolExecutor
通過ctl
維護(hù)線程池的狀態(tài)和線程數(shù)量,其中高3位存儲(chǔ)運(yùn)行狀態(tài),低29位存儲(chǔ)線程數(shù)量。
線程池設(shè)定了RUNNING
、SHUTDOWN
、STOP
、TIDYING
和TERMINATED
五種狀態(tài),其轉(zhuǎn)移圖如下:
在這5種狀態(tài)中,只有RUNNING
時(shí)線程池可接收新任務(wù),其余4種狀態(tài)在調(diào)用shutDown
或shutDownNow
后觸發(fā)轉(zhuǎn)換,且在這4種狀態(tài)時(shí),線程池均不再接收新任務(wù)。
任務(wù)管理解析
// 用于存放提交任務(wù)的隊(duì)列 private final BlockingQueue<Runnable> workQueue; // 用于保存池內(nèi)的工作線程,Java將Thread包裝成Worker存儲(chǔ) private final HashSet<Worder> workers = new HashSet<Worker>();
ThreadPoolExecutor
主要通過workQueue
和workers
兩個(gè)字段用于管理和執(zhí)行任務(wù)。
線程池任務(wù)執(zhí)行流程如圖,結(jié)合ThreadPoolExecutor.execute
源碼,對(duì)任務(wù)執(zhí)行流程進(jìn)行說明:
- 當(dāng)任務(wù)提交到線程池時(shí),如果當(dāng)前線程數(shù)量小于核心線程數(shù),則會(huì)將為該任務(wù)直接創(chuàng)建一個(gè)
worker
并將任務(wù)交由worker
執(zhí)行。
if (workerCountOf(c) < corePoolSize) { // 創(chuàng)建新worker執(zhí)行任務(wù),true表示核心線程 if (addWorker(command, true)) return; c = ctl.get(); }
- 當(dāng)已經(jīng)達(dá)到核心線程數(shù)后,任務(wù)會(huì)提交到隊(duì)列保存;
// 放入workQueue隊(duì)列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 這里采用double check再次檢測(cè)線程池狀態(tài) if (! isRunning(recheck) && remove(command)) reject(command); // 避免加入隊(duì)列后,所有worker都已被回收無(wú)可用線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); }
- 如果隊(duì)列已滿,則依據(jù)最大線程數(shù)量創(chuàng)建新
worker
執(zhí)行。如果新增worker
失敗,則依據(jù)設(shè)定策略拒絕任務(wù)。
// 接上,放入隊(duì)列失敗 // 添加新worker執(zhí)行任務(wù),false表示非核心線程 else if (!addWorker(command, false)) // 如添加失敗,執(zhí)行拒絕策略 reject(command);
woker對(duì)象
ThreadPoolExecutor
沒有直接使用Thread
記錄線程,而是定義了worker
用于包裝線程對(duì)象。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { ... final Thread thread; Runnable firstTask; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } // worker對(duì)象被創(chuàng)建后就會(huì)執(zhí)行 public void run() { runWorker(this); } }
worker
對(duì)象通過addWorker
方法創(chuàng)建,一般會(huì)為其指定一個(gè)初始任務(wù)firstTask
,當(dāng)worker
執(zhí)行完畢以后,worker
會(huì)從阻塞隊(duì)列中讀取任務(wù),如果沒有任務(wù),則該worker
會(huì)陷入阻塞狀態(tài)給出worker
的核心邏輯代碼:
private boolean addWorker(Runnable firstTask, boolean core) { ... // 指定firstTask,可能為null w = new Worker(firstTask); ... if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); workerAdded = true; } ... // 執(zhí)行新添加的worker if (workerAdded) { t.start(); workerStarted = true; } } final void runWorker(Worker w) { // 等待workQueue的任務(wù) while (task != null || (task = getTask()) != null) { ... } } private Runnable getTask() { ... for (;;) { ... // 如果是普通工作線程,則根據(jù)線程存活時(shí)間讀取阻塞隊(duì)列 // 如果是核心工作線程,則直接陷入阻塞狀態(tài),等待workQueue獲取任務(wù) Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); ... } }
如下圖,任務(wù)提交后觸發(fā)addWorker
創(chuàng)建worker
對(duì)象,該對(duì)象執(zhí)行任務(wù)完畢后,則循環(huán)獲取隊(duì)列中任務(wù)等待執(zhí)行。
Java線程池實(shí)踐建議
不建議使用Exectuors
線程池不允許使用Executors
去創(chuàng)建,而是通過ThreadPoolExecutor
的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)?!栋⒗锇桶烷_發(fā)手冊(cè)》
雖然Java推薦開發(fā)者直接使用Executors
提供的線程池,但實(shí)際開發(fā)中通常不使用。主要考慮問題有:
- 潛在的OOM問題
CachedThreadPool
將最大數(shù)量設(shè)置為Integer.MAX_VALUE
,如果一直提交任務(wù),可能造成Thread
對(duì)象過多引起OOM
。FixedThreadPool
和SingleThreadPoo
的隊(duì)列LinkedBlockingQueue
無(wú)容量限制,阻塞任務(wù)過多也可能造成OOM
。
- 線程問題定位不便
由于未指定ThreadFactory
,線程名稱默認(rèn)為pool-poolNumber-thread-thredNumber
,線程出現(xiàn)問題后不便定位具體線程池。
- 線程池分散
通常在完善的項(xiàng)目中,由于線程是重量資源,因此線程池由統(tǒng)一模塊管理,重復(fù)創(chuàng)建線程池容易造成資源分散,難以管理。
線程池大小設(shè)置
通常按照IO繁忙型和CPU繁忙型任務(wù)分別采用以下兩個(gè)普遍公式。
在理論場(chǎng)景中,如一個(gè)任務(wù)IO耗時(shí)40ms,CPU耗時(shí)10ms,那么在IO處理期間,CPU是空閑的,此時(shí)還可以處理4個(gè)任務(wù)(40/10),因此理論上可以按照IO和CPU的時(shí)間消耗比設(shè)定線程池大小。
《JAVA并發(fā)編程實(shí)踐》中還考慮數(shù)量乘以目標(biāo)CPU的利用率
在實(shí)際場(chǎng)景中,我們通常無(wú)法準(zhǔn)確測(cè)算IO和CPU的耗時(shí)占比,并且隨著流量變化,任務(wù)的耗時(shí)占比也不能固定。因此可根據(jù)業(yè)務(wù)需求,開設(shè)線程池運(yùn)維接口,根據(jù)線上指標(biāo)動(dòng)態(tài)調(diào)整線程池參數(shù)。
推薦參考第二篇美團(tuán)線程池應(yīng)用
線程池監(jiān)控
ThreadPoolExecutor
提供以下方法監(jiān)控線程池:
getTaskCount()
返回被調(diào)度過的任務(wù)數(shù)量getCompletedTaskCount()
返回完成的任務(wù)數(shù)量getPoolSize()
返回當(dāng)前線程池線程數(shù)量getActiveCount()
返回活躍線程數(shù)量getQueue()
獲取隊(duì)列,一般用于監(jiān)控阻塞任務(wù)數(shù)量和隊(duì)列空間大小
到此這篇關(guān)于超詳細(xì)講解Java線程池的文章就介紹到這了,更多相關(guān)Java 線程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
基于springboot創(chuàng)建mybatis的完整步驟
MyBatis是一款優(yōu)秀的數(shù)據(jù)庫(kù)持久層框架,相比Hibernate我更喜歡使用MyBatis,看的到SQL還是讓人更安心點(diǎn),這篇文章主要給大家介紹了關(guān)于基于springboot創(chuàng)建mybatis的完整步驟,需要的朋友可以參考下2024-03-03Spring中@Configuration注解的Full模式和Lite模式詳解
這篇文章主要介紹了Spring中@Configuration注解的Full模式和Lite模式詳解,準(zhǔn)確來(lái)說,Full?模式和?Lite?模式其實(shí)?Spring?容器在處理?Bean?時(shí)的兩種不同行為,這兩種不同的模式在使用時(shí)候的表現(xiàn)完全不同,今天就來(lái)和各位小伙伴捋一捋這兩種模式,需要的朋友可以參考下2023-09-09淺談一個(gè)基礎(chǔ)的SpringBoot項(xiàng)目該包含哪些
這篇文章主要介紹了淺談一個(gè)基礎(chǔ)的SpringBoot項(xiàng)目該包含哪些,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10java9版本特性資源自動(dòng)關(guān)閉的語(yǔ)法增強(qiáng)
這篇文章主要為大家介紹了java9版本特性資源自動(dòng)關(guān)閉的語(yǔ)法增強(qiáng)的詳細(xì)使用說明,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-03-03使用jpa的實(shí)體對(duì)象轉(zhuǎn)json符串時(shí)懶加載的問題及解決
這篇文章主要介紹了使用jpa的實(shí)體對(duì)象轉(zhuǎn)json符串時(shí)懶加載的問題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02SpringBoot優(yōu)化接口響應(yīng)時(shí)間的九個(gè)技巧
在實(shí)際開發(fā)中,提升接口響應(yīng)速度是一件挺重要的事,特別是在面臨大量用戶請(qǐng)求的時(shí)候,本文為大家整理了9個(gè)SpringBoot優(yōu)化接口響應(yīng)時(shí)間的技巧,希望對(duì)大家有所幫助2024-01-01