Java線程池?ThreadPoolExecutor?詳解
一 為什么要使用線程池
對(duì)于操作系統(tǒng)而言,創(chuàng)建一個(gè)線程的代價(jià)是十分昂貴的, 需要給它分配內(nèi)存、列入調(diào)度,同時(shí)在線程切換時(shí)要執(zhí)行內(nèi)存換頁,清空 CPU 緩存,切換回來時(shí)還要重新從內(nèi)存中讀取信息,破壞了數(shù)據(jù)的局部性。因此在并發(fā)編程中,當(dāng)線程創(chuàng)建過多時(shí),會(huì)影響程序性能,甚至引起程序崩潰。
而線程池屬于池化管理模式,具有以下優(yōu)點(diǎn):
- 降低資源消耗:通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的性能消耗。
- 提高響應(yīng)速度:當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
- 提高線程的可管理性:能夠?qū)€程進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。
二 線程池原理詳解
2.1 線程池核心組成
線程池包含 3 個(gè)核心部分:
- 線程集合:核心線程和工作線程
- 阻塞隊(duì)列:用于待執(zhí)行任務(wù)排隊(duì)
- 拒絕策略處理器:阻塞隊(duì)列滿后,對(duì)任務(wù)處理進(jìn)行
2.2 Execute 原理
當(dāng)一個(gè)新任務(wù)提交至線程池之后,線程池的處理流程如下:
- 首先判斷當(dāng)前運(yùn)行的線程數(shù)量是否小于 corePoolSize。如果是,則創(chuàng)建一個(gè)工作線程來執(zhí)行任務(wù);如果都在執(zhí)行任務(wù),則進(jìn)入步驟 2。
- 判斷 BlockingQueue 是否已經(jīng)滿了,若沒滿,則將任務(wù)放入 BlockingQueue;若滿了,則進(jìn)入步驟 3。
- 判斷當(dāng)前運(yùn)行的總線程數(shù)量是否小于 maximumPoolSize,如果是則創(chuàng)建一個(gè)新的工作線程來執(zhí)行任務(wù)。
- 否則交給 RejectedExecutionHandler 來處理任務(wù)。
當(dāng) ThreadPoolExecutor 創(chuàng)建新線程時(shí),通過 CAS 來更新線程池的狀態(tài) ctl。
三 線程池的使用
線程池的使用主要分為以下三個(gè)步驟:
3.1 創(chuàng)建線程池
3.1.1 自定義線程池
線程池的真正實(shí)現(xiàn)類是 ThreadPoolExecutor,其構(gòu)造方法有如下 4 種:
public ThreadPoolExecutor(int corePoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? int maximumPoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime, ? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit, ? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue<Runnable> workQueue) { ? ? this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, ? ? ? ? ?Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? int maximumPoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime, ? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit, ? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue<Runnable> workQueue, ? ? ? ? ? ? ? ? ? ? ? ? ? ThreadFactory threadFactory) { ? ? this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, ? ? ? ? ?threadFactory, defaultHandler); } ? public ThreadPoolExecutor(int corePoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? int maximumPoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime, ? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit, ? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue<Runnable> workQueue, ? ? ? ? ? ? ? ? ? ? ? ? ? RejectedExecutionHandler handler) { ? ? this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, ? ? ? ? ?Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? int maximumPoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime, ? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit, ? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue<Runnable> workQueue, ? ? ? ? ? ? ? ? ? ? ? ? ? ThreadFactory threadFactory, ? ? ? ? ? ? ? ? ? ? ? ? ? RejectedExecutionHandler handler) { ? ? if (corePoolSize < 0 || ? ? ? ? maximumPoolSize <= 0 || ? ? ? ? maximumPoolSize < corePoolSize || ? ? ? ? keepAliveTime < 0) ? ? ? ? throw new IllegalArgumentException(); ? ? if (workQueue == null || threadFactory == null || handler == null) ? ? ? ? throw new NullPointerException(); ? ? this.corePoolSize = corePoolSize; ? ? this.maximumPoolSize = maximumPoolSize; ? ? this.workQueue = workQueue; ? ? this.keepAliveTime = unit.toNanos(keepAliveTime); ? ? this.threadFactory = threadFactory; ? ? this.handler = handler; }
下面詳細(xì)來看構(gòu)造函數(shù)需要傳入的重點(diǎn)參數(shù):
corePoolSize
(必需)線程池中的核心線程數(shù),當(dāng)提交一個(gè)任務(wù)時(shí),線程池創(chuàng)建一個(gè)新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于 corePoolSize, 即使有其他空閑線程能夠執(zhí)行新來的任務(wù), 也會(huì)繼續(xù)創(chuàng)建線程;如果當(dāng)前線程數(shù)為 corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊(duì)列中,等待被執(zhí)行;如果執(zhí)行了線程池的 **prestartAllCoreThreads()**方法,線程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線程。workQueue
(必需)用來保存等待被執(zhí)行的任務(wù)的阻塞隊(duì)列。ArrayBlockingQueue
: 基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,按 FIFO 排序任務(wù);LinkedBlockingQueue
: 基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,按 FIFO 排序任務(wù),吞吐量通常要高于 ArrayBlockingQueue;SynchronousQueue
: 一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于 LinkedBlockingQueue;PriorityBlockingQueue
: 具有優(yōu)先級(jí)的無界阻塞隊(duì)列;
maximumPoolSize
(必需)線程池中能容納的最大線程數(shù)。如果當(dāng)前阻塞隊(duì)列滿了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù),前提是當(dāng)前線程數(shù)小于 maximumPoolSize;當(dāng)阻塞隊(duì)列是無界隊(duì)列, 則 maximumPoolSize 則不起作用, 因?yàn)闊o法提交至核心線程池的線程會(huì)一直持續(xù)地放入 workQueue。keepAliveTime
(必需)線程閑置超時(shí)時(shí)長(zhǎng)。如果超過該時(shí)長(zhǎng),非核心線程就會(huì)被回收。如果將 allowCoreThreadTimeout 設(shè)置為 true 時(shí),核心線程也會(huì)超時(shí)回收。unit
(必需)keepAliveTime 的單位,常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)threadFactory
(可選)創(chuàng)建線程的工廠,通過自定義的線程工廠可以給每個(gè)新建的線程設(shè)置一個(gè)具有識(shí)別度的線程名。默認(rèn)為 DefaultThreadFactoryhandler
(可選)線程池的飽和策略,當(dāng)阻塞隊(duì)列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務(wù),必須采取一種策略處理該任務(wù),線程池提供了 4 種策略:AbortPolicy
: 直接拋出異常,默認(rèn)策略;CallerRunsPolicy
: 用調(diào)用者所在的線程來執(zhí)行任務(wù);DiscardOldestPolicy
: 丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);DiscardPolicy
: 直接丟棄任務(wù); 也可以根據(jù)應(yīng)用場(chǎng)景實(shí)現(xiàn) RejectedExecutionHandler 接口,自定義飽和策略,如記錄日志或持久化存儲(chǔ)不能處理的任務(wù)。
3.1.2 功能線程池
除了調(diào)用 ThreadPoolExecutor 自定義線程池的方式,其實(shí) Executors 也已經(jīng)為我們封裝好了 4 種常見的功能線程池,如下:
- 定長(zhǎng)線程池(FixedThreadPool)
- 定時(shí)線程池(ScheduledThreadPool)
- 可緩存線程池(CachedThreadPool)
- 單線程化線程池(SingleThreadExecutor)
定長(zhǎng)線程池(FixedThreadPool)
- 特點(diǎn):只有核心線程,線程數(shù)量固定,執(zhí)行完立即回收,任務(wù)隊(duì)列為鏈表結(jié)構(gòu)的有界隊(duì)列。
- 應(yīng)用場(chǎng)景:控制線程最大并發(fā)數(shù)。
- 使用示例:
// 1. 創(chuàng)建定長(zhǎng)線程池對(duì)象 & 設(shè)置線程池線程數(shù)量固定為 3 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); // 2. 創(chuàng)建好 Runnable 類線程對(duì)象 & 需執(zhí)行的任務(wù) Runnable task = new Runnable(){ ? public void run() { ? ? ?...//待執(zhí)行的耗時(shí)任務(wù) ? } }; // 3. 向線程池提交任務(wù) fixedThreadPool.execute(task);
定時(shí)線程池(ScheduledThreadPool)
- 特點(diǎn):核心線程數(shù)量固定,非核心線程數(shù)量無限,執(zhí)行完閑置 10ms 后回收,任務(wù)隊(duì)列為延時(shí)阻塞隊(duì)列。
- 應(yīng)用場(chǎng)景:執(zhí)行定時(shí)或周期性的任務(wù)。
- 使用示例:
// 1. 創(chuàng)建定時(shí)線程池對(duì)象 & 設(shè)置線程池線程數(shù)量固定為 5 ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); // 2. 創(chuàng)建好 Runnable 類線程對(duì)象 & 需執(zhí)行的任務(wù) Runnable task =new Runnable(){ ? public void run() { ? ? ?...//待執(zhí)行的耗時(shí)任務(wù) ? } }; // 3. 向線程池提交任務(wù) scheduledThreadPool.schedule(task, 1, TimeUnit.SECONDS); // 延遲 1s 后執(zhí)行任務(wù)
可緩存線程池(CachedThreadPool)
- 特點(diǎn):無核心線程,非核心線程數(shù)量無限,執(zhí)行完閑置 60s 后回收,任務(wù)隊(duì)列為不存儲(chǔ)元素的阻塞隊(duì)列。
- 應(yīng)用場(chǎng)景:執(zhí)行大量、耗時(shí)少的任務(wù)。
- 使用示例:
// 1. 創(chuàng)建可緩存線程池對(duì)象 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); // 2. 創(chuàng)建好 Runnable 類線程對(duì)象 & 需執(zhí)行的任務(wù) Runnable task =new Runnable(){ ? public void run() { ? ? ?...//待執(zhí)行的耗時(shí)任務(wù) ? } }; // 3. 向線程池提交任務(wù) cachedThreadPool.execute(task);
單線程化線程池(SingleThreadExecutor)
- 特點(diǎn):只有 1 個(gè)核心線程,無非核心線程,執(zhí)行完立即回收,任務(wù)隊(duì)列為鏈表結(jié)構(gòu)的有界隊(duì)列。
- 應(yīng)用場(chǎng)景:不適合并發(fā)但可能引起 IO 阻塞性及影響 UI 線程響應(yīng)的操作,如數(shù)據(jù)庫操作、文件操作等。
- 使用示例:
// 1. 創(chuàng)建單線程化線程池 ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); // 2. 創(chuàng)建好 Runnable 類線程對(duì)象 & 需執(zhí)行的任務(wù) Runnable task =new Runnable(){ ? public void run() { ? ? ?...//待執(zhí)行的耗時(shí)任務(wù) ? } }; // 3. 向線程池提交任務(wù) singleThreadExecutor.execute(task);
3.1.3 功能線程池存在的問題
目前已不推薦使用功能線程池,而是通過自定義 ThreadPoolExecutor 的方式。因?yàn)橹苯邮褂霉δ芫€程池具有資源耗盡的風(fēng)險(xiǎn)。
- newFixedThreadPool 和 newSingleThreadExecutor: 主要問題是堆積的請(qǐng)求處理隊(duì)列均采用 LinkedBlockingQueue,可能會(huì)耗費(fèi)非常大的內(nèi)存,甚至 OOM。
- newCachedThreadPool 和 newScheduledThreadPool: 主要問題是線程數(shù)最大數(shù)是 Integer.MAX_VALUE,可能會(huì)創(chuàng)建數(shù)量非常多的線程,甚至 OOM。
3.2 向線程池提交任務(wù)
向線程池提交任務(wù)的流程非常簡(jiǎn)單,只需要向線程池的 execute 方法傳入 Runnable 對(duì)象即可。
// 向線程池提交任務(wù) threadPool.execute(new Runnable() { ? ? @Override ? ? public void run() { ? ? ? ? ... //待執(zhí)行的任務(wù) ? ? } });
3.3 關(guān)閉線程池
當(dāng)線程池不再使用時(shí),需要手動(dòng)關(guān)閉以釋放資源。線程池關(guān)閉的原理是:遍歷線程池中的所有線程,然后逐個(gè)調(diào)用線程的 interrupt 方法來中斷線程。一般通過調(diào)用以下兩個(gè)方法:
- shutdown():將線程池里的線程狀態(tài)設(shè)置成 SHUTDOWN 狀態(tài), 然后中斷所有沒有正在執(zhí)行任務(wù)的線程。
- shutdownNow():將線程池里的線程狀態(tài)設(shè)置成 STOP 狀態(tài), 然后停止所有正在執(zhí)行或暫停任務(wù)的線程. 只要調(diào)用這兩個(gè)關(guān)閉方法中的任意一個(gè), isShutDown() 返回 true. 當(dāng)所有任務(wù)都成功關(guān)閉了, isTerminated()返回 true。
3.4 自定義線程池需要考慮因素
使用 ThreadPoolExecutor 自定義線程池時(shí),需要從任務(wù)的優(yōu)先級(jí),任務(wù)的執(zhí)行時(shí)間長(zhǎng)短,任務(wù)的性質(zhì)(CPU 密集/ IO 密集),任務(wù)的依賴關(guān)系這四個(gè)角度來分析。并且近可能地使用有界的工作隊(duì)列。
性質(zhì)不同的任務(wù)可用使用不同規(guī)模的線程池分開處理:
- CPU 密集型: 盡可能少的線程,核心線程數(shù) = CPU 核心數(shù) + 1
- IO 密集型: 盡可能多的線程, 核心線程數(shù) = CPU 核心數(shù) * 2
- 混合型: CPU 密集型的任務(wù)與 IO 密集型任務(wù)的執(zhí)行時(shí)間差別較小,拆分為兩個(gè)線程池;否則沒有必要拆分。
到此這篇關(guān)于Java線程池 ThreadPoolExecutor 詳解的文章就介紹到這了,更多相關(guān)JavaThreadPoolExecutor內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java多線程ThreadPoolExecutor詳解
- Java線程池ThreadPoolExecutor源碼深入分析
- java高并發(fā)ThreadPoolExecutor類解析線程池執(zhí)行流程
- java高并發(fā)ScheduledThreadPoolExecutor與Timer區(qū)別
- 徹底搞懂java并發(fā)ThreadPoolExecutor使用
- Java多線程編程基石ThreadPoolExecutor示例詳解
- 源碼分析Java中ThreadPoolExecutor的底層原理
- 一文搞懂Java的ThreadPoolExecutor原理
- 一文弄懂Java中ThreadPoolExecutor
相關(guān)文章
springBoot啟動(dòng)輸出三行日志控制臺(tái)自動(dòng)停止操作
這篇文章主要介紹了springBoot啟動(dòng)輸出三行日志控制臺(tái)自動(dòng)停止操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08

Spring boot 默認(rèn)靜態(tài)資源路徑與手動(dòng)配置訪問路徑的方法