Java線程池?ThreadPoolExecutor?詳解
一 為什么要使用線程池
對于操作系統(tǒng)而言,創(chuàng)建一個線程的代價是十分昂貴的, 需要給它分配內存、列入調度,同時在線程切換時要執(zhí)行內存換頁,清空 CPU 緩存,切換回來時還要重新從內存中讀取信息,破壞了數(shù)據(jù)的局部性。因此在并發(fā)編程中,當線程創(chuàng)建過多時,會影響程序性能,甚至引起程序崩潰。
而線程池屬于池化管理模式,具有以下優(yōu)點:
- 降低資源消耗:通過重復利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的性能消耗。
- 提高響應速度:當任務到達時,任務可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
- 提高線程的可管理性:能夠對線程進行統(tǒng)一分配、調優(yōu)和監(jiān)控。
二 線程池原理詳解
2.1 線程池核心組成
線程池包含 3 個核心部分:
- 線程集合:核心線程和工作線程
- 阻塞隊列:用于待執(zhí)行任務排隊
- 拒絕策略處理器:阻塞隊列滿后,對任務處理進行
2.2 Execute 原理
當一個新任務提交至線程池之后,線程池的處理流程如下:
- 首先判斷當前運行的線程數(shù)量是否小于 corePoolSize。如果是,則創(chuàng)建一個工作線程來執(zhí)行任務;如果都在執(zhí)行任務,則進入步驟 2。
- 判斷 BlockingQueue 是否已經(jīng)滿了,若沒滿,則將任務放入 BlockingQueue;若滿了,則進入步驟 3。
- 判斷當前運行的總線程數(shù)量是否小于 maximumPoolSize,如果是則創(chuàng)建一個新的工作線程來執(zhí)行任務。
- 否則交給 RejectedExecutionHandler 來處理任務。
當 ThreadPoolExecutor 創(chuàng)建新線程時,通過 CAS 來更新線程池的狀態(tài) ctl。
三 線程池的使用
線程池的使用主要分為以下三個步驟:
3.1 創(chuàng)建線程池
3.1.1 自定義線程池
線程池的真正實現(xiàn)類是 ThreadPoolExecutor,其構造方法有如下 4 種:
public ThreadPoolExecutor(int corePoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? int maximumPoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime, ? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit, ? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue<Runnable> workQueue) { ? ? this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, ? ? ? ? ?Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? int maximumPoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime, ? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit, ? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue<Runnable> workQueue, ? ? ? ? ? ? ? ? ? ? ? ? ? ThreadFactory threadFactory) { ? ? this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, ? ? ? ? ?threadFactory, defaultHandler); } ? public ThreadPoolExecutor(int corePoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? int maximumPoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime, ? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit, ? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue<Runnable> workQueue, ? ? ? ? ? ? ? ? ? ? ? ? ? RejectedExecutionHandler handler) { ? ? this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, ? ? ? ? ?Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? int maximumPoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime, ? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit, ? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue<Runnable> workQueue, ? ? ? ? ? ? ? ? ? ? ? ? ? ThreadFactory threadFactory, ? ? ? ? ? ? ? ? ? ? ? ? ? RejectedExecutionHandler handler) { ? ? if (corePoolSize < 0 || ? ? ? ? maximumPoolSize <= 0 || ? ? ? ? maximumPoolSize < corePoolSize || ? ? ? ? keepAliveTime < 0) ? ? ? ? throw new IllegalArgumentException(); ? ? if (workQueue == null || threadFactory == null || handler == null) ? ? ? ? throw new NullPointerException(); ? ? this.corePoolSize = corePoolSize; ? ? this.maximumPoolSize = maximumPoolSize; ? ? this.workQueue = workQueue; ? ? this.keepAliveTime = unit.toNanos(keepAliveTime); ? ? this.threadFactory = threadFactory; ? ? this.handler = handler; }
下面詳細來看構造函數(shù)需要傳入的重點參數(shù):
corePoolSize
(必需)線程池中的核心線程數(shù),當提交一個任務時,線程池創(chuàng)建一個新線程執(zhí)行任務,直到當前線程數(shù)等于 corePoolSize, 即使有其他空閑線程能夠執(zhí)行新來的任務, 也會繼續(xù)創(chuàng)建線程;如果當前線程數(shù)為 corePoolSize,繼續(xù)提交的任務被保存到阻塞隊列中,等待被執(zhí)行;如果執(zhí)行了線程池的 **prestartAllCoreThreads()**方法,線程池會提前創(chuàng)建并啟動所有核心線程。workQueue
(必需)用來保存等待被執(zhí)行的任務的阻塞隊列。ArrayBlockingQueue
: 基于數(shù)組結構的有界阻塞隊列,按 FIFO 排序任務;LinkedBlockingQueue
: 基于鏈表結構的阻塞隊列,按 FIFO 排序任務,吞吐量通常要高于 ArrayBlockingQueue;SynchronousQueue
: 一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于 LinkedBlockingQueue;PriorityBlockingQueue
: 具有優(yōu)先級的無界阻塞隊列;
maximumPoolSize
(必需)線程池中能容納的最大線程數(shù)。如果當前阻塞隊列滿了,且繼續(xù)提交任務,則創(chuàng)建新的線程執(zhí)行任務,前提是當前線程數(shù)小于 maximumPoolSize;當阻塞隊列是無界隊列, 則 maximumPoolSize 則不起作用, 因為無法提交至核心線程池的線程會一直持續(xù)地放入 workQueue。keepAliveTime
(必需)線程閑置超時時長。如果超過該時長,非核心線程就會被回收。如果將 allowCoreThreadTimeout 設置為 true 時,核心線程也會超時回收。unit
(必需)keepAliveTime 的單位,常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)threadFactory
(可選)創(chuàng)建線程的工廠,通過自定義的線程工廠可以給每個新建的線程設置一個具有識別度的線程名。默認為 DefaultThreadFactoryhandler
(可選)線程池的飽和策略,當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務,必須采取一種策略處理該任務,線程池提供了 4 種策略:AbortPolicy
: 直接拋出異常,默認策略;CallerRunsPolicy
: 用調用者所在的線程來執(zhí)行任務;DiscardOldestPolicy
: 丟棄阻塞隊列中靠最前的任務,并執(zhí)行當前任務;DiscardPolicy
: 直接丟棄任務; 也可以根據(jù)應用場景實現(xiàn) RejectedExecutionHandler 接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務。
3.1.2 功能線程池
除了調用 ThreadPoolExecutor 自定義線程池的方式,其實 Executors 也已經(jīng)為我們封裝好了 4 種常見的功能線程池,如下:
- 定長線程池(FixedThreadPool)
- 定時線程池(ScheduledThreadPool)
- 可緩存線程池(CachedThreadPool)
- 單線程化線程池(SingleThreadExecutor)
定長線程池(FixedThreadPool)
- 特點:只有核心線程,線程數(shù)量固定,執(zhí)行完立即回收,任務隊列為鏈表結構的有界隊列。
- 應用場景:控制線程最大并發(fā)數(shù)。
- 使用示例:
// 1. 創(chuàng)建定長線程池對象 & 設置線程池線程數(shù)量固定為 3 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); // 2. 創(chuàng)建好 Runnable 類線程對象 & 需執(zhí)行的任務 Runnable task = new Runnable(){ ? public void run() { ? ? ?...//待執(zhí)行的耗時任務 ? } }; // 3. 向線程池提交任務 fixedThreadPool.execute(task);
定時線程池(ScheduledThreadPool)
- 特點:核心線程數(shù)量固定,非核心線程數(shù)量無限,執(zhí)行完閑置 10ms 后回收,任務隊列為延時阻塞隊列。
- 應用場景:執(zhí)行定時或周期性的任務。
- 使用示例:
// 1. 創(chuàng)建定時線程池對象 & 設置線程池線程數(shù)量固定為 5 ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); // 2. 創(chuàng)建好 Runnable 類線程對象 & 需執(zhí)行的任務 Runnable task =new Runnable(){ ? public void run() { ? ? ?...//待執(zhí)行的耗時任務 ? } }; // 3. 向線程池提交任務 scheduledThreadPool.schedule(task, 1, TimeUnit.SECONDS); // 延遲 1s 后執(zhí)行任務
可緩存線程池(CachedThreadPool)
- 特點:無核心線程,非核心線程數(shù)量無限,執(zhí)行完閑置 60s 后回收,任務隊列為不存儲元素的阻塞隊列。
- 應用場景:執(zhí)行大量、耗時少的任務。
- 使用示例:
// 1. 創(chuàng)建可緩存線程池對象 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); // 2. 創(chuàng)建好 Runnable 類線程對象 & 需執(zhí)行的任務 Runnable task =new Runnable(){ ? public void run() { ? ? ?...//待執(zhí)行的耗時任務 ? } }; // 3. 向線程池提交任務 cachedThreadPool.execute(task);
單線程化線程池(SingleThreadExecutor)
- 特點:只有 1 個核心線程,無非核心線程,執(zhí)行完立即回收,任務隊列為鏈表結構的有界隊列。
- 應用場景:不適合并發(fā)但可能引起 IO 阻塞性及影響 UI 線程響應的操作,如數(shù)據(jù)庫操作、文件操作等。
- 使用示例:
// 1. 創(chuàng)建單線程化線程池 ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); // 2. 創(chuàng)建好 Runnable 類線程對象 & 需執(zhí)行的任務 Runnable task =new Runnable(){ ? public void run() { ? ? ?...//待執(zhí)行的耗時任務 ? } }; // 3. 向線程池提交任務 singleThreadExecutor.execute(task);
3.1.3 功能線程池存在的問題
目前已不推薦使用功能線程池,而是通過自定義 ThreadPoolExecutor 的方式。因為直接使用功能線程池具有資源耗盡的風險。
- newFixedThreadPool 和 newSingleThreadExecutor: 主要問題是堆積的請求處理隊列均采用 LinkedBlockingQueue,可能會耗費非常大的內存,甚至 OOM。
- newCachedThreadPool 和 newScheduledThreadPool: 主要問題是線程數(shù)最大數(shù)是 Integer.MAX_VALUE,可能會創(chuàng)建數(shù)量非常多的線程,甚至 OOM。
3.2 向線程池提交任務
向線程池提交任務的流程非常簡單,只需要向線程池的 execute 方法傳入 Runnable 對象即可。
// 向線程池提交任務 threadPool.execute(new Runnable() { ? ? @Override ? ? public void run() { ? ? ? ? ... //待執(zhí)行的任務 ? ? } });
3.3 關閉線程池
當線程池不再使用時,需要手動關閉以釋放資源。線程池關閉的原理是:遍歷線程池中的所有線程,然后逐個調用線程的 interrupt 方法來中斷線程。一般通過調用以下兩個方法:
- shutdown():將線程池里的線程狀態(tài)設置成 SHUTDOWN 狀態(tài), 然后中斷所有沒有正在執(zhí)行任務的線程。
- shutdownNow():將線程池里的線程狀態(tài)設置成 STOP 狀態(tài), 然后停止所有正在執(zhí)行或暫停任務的線程. 只要調用這兩個關閉方法中的任意一個, isShutDown() 返回 true. 當所有任務都成功關閉了, isTerminated()返回 true。
3.4 自定義線程池需要考慮因素
使用 ThreadPoolExecutor 自定義線程池時,需要從任務的優(yōu)先級,任務的執(zhí)行時間長短,任務的性質(CPU 密集/ IO 密集),任務的依賴關系這四個角度來分析。并且近可能地使用有界的工作隊列。
性質不同的任務可用使用不同規(guī)模的線程池分開處理:
- CPU 密集型: 盡可能少的線程,核心線程數(shù) = CPU 核心數(shù) + 1
- IO 密集型: 盡可能多的線程, 核心線程數(shù) = CPU 核心數(shù) * 2
- 混合型: CPU 密集型的任務與 IO 密集型任務的執(zhí)行時間差別較小,拆分為兩個線程池;否則沒有必要拆分。
到此這篇關于Java線程池 ThreadPoolExecutor 詳解的文章就介紹到這了,更多相關JavaThreadPoolExecutor內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
- Java多線程ThreadPoolExecutor詳解
- Java線程池ThreadPoolExecutor源碼深入分析
- java高并發(fā)ThreadPoolExecutor類解析線程池執(zhí)行流程
- java高并發(fā)ScheduledThreadPoolExecutor與Timer區(qū)別
- 徹底搞懂java并發(fā)ThreadPoolExecutor使用
- Java多線程編程基石ThreadPoolExecutor示例詳解
- 源碼分析Java中ThreadPoolExecutor的底層原理
- 一文搞懂Java的ThreadPoolExecutor原理
- 一文弄懂Java中ThreadPoolExecutor
相關文章
Spring boot 默認靜態(tài)資源路徑與手動配置訪問路徑的方法
這篇文章主要介紹了Spring boot 默認靜態(tài)資源路徑與手動配置訪問路徑的方法,非常不錯,具有參考借鑒價值,需要的朋友可以參考下2017-05-05