Java線程池ThreadPoolExecutor的使用及其原理詳細(xì)解讀
什么是線程池
線程池是一種多線程處理形式,處理過程中將任務(wù)添加到隊列,然后在創(chuàng)建線程后自動啟動這些任務(wù)。線程池線程都是后臺線程。每個線程都使用默認(rèn)的堆棧大小,以默認(rèn)的優(yōu)先級運行,并處于多線程單元中。如果某個線程在托管代碼中空閑(如正在等待某個事件),則線程池將插入另一個輔助線程來使所有處理器保持繁忙。如果所有線程池線程都始終保持繁忙,但隊列中包含掛起的工作,則線程池將在一段時間后創(chuàng)建另一個輔助線程但線程的數(shù)目永遠(yuǎn)不會超過最大值。超過最大值的線程可以排隊,但他們要等到其他線程完成后才啟動。
線程池的作用
- 提高程序運行效率:創(chuàng)建一個新線程需要消耗一定的時間,而線程中的線程在使用時已經(jīng)創(chuàng)建完成,可節(jié)省此部分開銷。
- 解耦作用;線程的創(chuàng)建與執(zhí)行完全分開,方便維護。
- 資源復(fù)用:每次線程執(zhí)行完畢后會重新放入線程池中供其它調(diào)用者使用,可以減少線程銷毀的開銷。
- 提高cpu利用率:線程數(shù)過多會導(dǎo)致系統(tǒng)內(nèi)核額外的線程調(diào)度開銷,線程池可以控制創(chuàng)建線程數(shù)量以實現(xiàn)性能最大化。線程數(shù)量應(yīng)該取決于可用的并發(fā)處理器、處理器內(nèi)核、內(nèi)存、網(wǎng)絡(luò)sockets等的數(shù)量,默認(rèn)情況下線程數(shù)一般取cpu數(shù)量+2比較合適。
- 系統(tǒng)健壯性:接受突發(fā)性的大量請求,不至于使服務(wù)器因此產(chǎn)生大量線程去處理請求而導(dǎo)致崩潰。
關(guān)于Java線程池(ThreadPool)
jdk提供的線程池有哪些
在 JDK 1.5 之后推出了相關(guān)的 api,其提供的線程池有以下四種:
- Executors.newCachedThreadPool():創(chuàng)建一個可緩存的線程池。如果線程池的大小超過了處理任務(wù)所需要的線程,那么就會回收部分空閑(60秒不執(zhí)行任務(wù))的線程,當(dāng)任務(wù)數(shù)增加時,此線程池又可以智能的添加新線程來處理任務(wù)。
- Executors.newFixedThreadPool(nThreads):創(chuàng)建固定大小的線程池。每次提交一個任務(wù)就創(chuàng)建一個線程,直到線程達(dá)到線程池的最大大小。線程池的大小一旦達(dá)到最大值就會保持不變,如果某個線程因為執(zhí)行異常而結(jié)束,那么線程池會補充一個新線程。
- Executors.newSingleThreadExecutor():創(chuàng)建單個線程的線程池。如果這個唯一的線程因為異常結(jié)束,那么會有一個新的線程來替代它。此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行。
- Executors.newScheduledThreadPool(corePoolSize):創(chuàng)建一個大小無限的線程池。此線程池支持定時以及周期性執(zhí)行任務(wù)的需求。
查看Executor源碼會發(fā)現(xiàn),Executor只是一個創(chuàng)建線程池的工具類,這四種方式創(chuàng)建的源碼就會發(fā)現(xiàn),都是利用 ThreadPoolExecutor 類實現(xiàn)的,真正的線程池接口是ExecutorService,其實現(xiàn)類為ThreadPoolExecutor。
Executors 部分源碼:
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}ThreadPoolExecutor 最終的構(gòu)造器源碼:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}- corePoolSize:線程池的大小。線程池創(chuàng)建之后不會立即去創(chuàng)建線程,而是等待線程的到來。當(dāng)前執(zhí)行的線程數(shù)大于該值時,線程會加入到緩沖隊列。
- maximumPoolSize:線程池中創(chuàng)建的最大線程數(shù)。
- keepAliveTime:空閑的線程多久時間后被銷毀。默認(rèn)情況下,該值在線程數(shù)大于corePoolSize時,對超出- corePoolSize值的這些線程起作用。
- unit:TimeUnit枚舉類型的值,代表keepAliveTime時間單位。
- workQueue: 用于在執(zhí)行任務(wù)之前保存任務(wù)的隊列。此隊列將僅包含execute方法提交的可運行任務(wù)。
- threadFactory:執(zhí)行程序創(chuàng)建新線程時要使用的工廠
- handler:線程拒絕策略。 當(dāng)隊列和最大線程池都滿了之后的飽和策略。
其它構(gòu)造器如下:
// 構(gòu)造器一
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// 構(gòu)造器二
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
// 構(gòu)造器三
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}關(guān)于創(chuàng)建線程池
有一點是肯定的,線程池肯定是不是越大越好。 通常我們是需要根據(jù)這批任務(wù)執(zhí)行的性質(zhì)來確定的。
- IO 密集型任務(wù):由于線程并不是一直在運行,所以可以盡可能的多配置線程,比如 CPU 個數(shù) * 2
- CPU 密集型任務(wù)(大量復(fù)雜的運算)應(yīng)當(dāng)分配較少的線程,比如 CPU 個數(shù)相當(dāng)?shù)拇笮 ?/li>
當(dāng)然這些都是經(jīng)驗值,最好的方式還是根據(jù)實際情況測試得出最佳配置。
通常我們使用線程池可以通過如下方式去使用:
使用jdk提供的線程池
//創(chuàng)建一個可緩存的線程池
ExecutorService cachedThreadPool= Executors.newCachedThreadPool();
//通過execute方法執(zhí)行接口
cachedThreadPool.execute(()->{
//Runnable to do something.
});創(chuàng)建自定義線程池
ExecutorService threadPool = new ThreadPoolExecutor(// 自定義一個線程池
1, // 核心線程數(shù)
2, // 最大線程數(shù)
60, // 超過核心線程數(shù)的額外線程存活時間
TimeUnit.SECONDS, // 線程存活時間的時間單位
new ArrayBlockingQueue<>(3) // 有界隊列,容量是3個
, Executors.defaultThreadFactory() // 線程工廠
, new ThreadPoolExecutor.AbortPolicy() //線程的拒絕策略
);
//執(zhí)行一個任務(wù)
threadPool.execute(()->{
//線程執(zhí)行的具體邏輯
//Runnable to do something.
});線程池的任務(wù)隊列
隊列有三種通用策略:
- 直接提交。工作隊列的默認(rèn)選項是 SynchronousQueue,它將任務(wù)直接提交給線程而不保持它們。在此,如果不存在可用于立即運行任務(wù)的線程,則試圖把任務(wù)加入隊列將失敗,因此會構(gòu)造一個新的線程。此策略可以避免在處理可能具有內(nèi)部依賴性的請求集時出現(xiàn)鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務(wù)。當(dāng)命令以超過隊列所能處理的平均數(shù)連續(xù)到達(dá)時,此策略允許無界線程具有增長的可能性。
- 無界隊列。使用無界隊列(例如,不具有預(yù)定義容量的 LinkedBlockingQueue)將導(dǎo)致在所有corePoolSize 線程都忙時新任務(wù)在隊列中等待。這樣,創(chuàng)建的線程就不會超過 corePoolSize。(因此,maximumPoolSize的值也就無效了。)當(dāng)每個任務(wù)完全獨立于其他任務(wù),即任務(wù)執(zhí)行互不影響時,適合于使用無界隊列;例如,在 Web頁服務(wù)器中。這種排隊可用于處理瞬態(tài)突發(fā)請求,當(dāng)命令以超過隊列所能處理的平均數(shù)連續(xù)到達(dá)時,此策略允許無界線程具有增長的可能性。
- 有界隊列。當(dāng)使用有限的 maximumPoolSizes時,有界隊列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調(diào)整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開銷,但是可能導(dǎo)致人工降低吞吐量。如果任務(wù)頻繁阻塞(例如,如果它們是 I/O邊界),則系統(tǒng)可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU使用率較高,但是可能遇到不可接受的調(diào)度開銷,這樣也會降低吞吐量。
線程池的任務(wù)拒絕策略
jdk提供了四種拒絕策略:
- AbortPolicy: 丟棄任務(wù),并拋出拒絕執(zhí)行 RejectedExecutionException 異常信息。必須處理好拋出的異常,否則會打斷當(dāng)前的執(zhí)行流程,影響后續(xù)的任務(wù)執(zhí)行。同時它也是線程池默認(rèn)的拒絕策略。
- CallerRunsPolicy: 當(dāng)觸發(fā)拒絕策略,只要線程池沒有關(guān)閉的話,則使用調(diào)用線程直接運行任務(wù)。一般并發(fā)比較小,性能要求不高,不允許失敗。但是,由于調(diào)用者自己運行任務(wù),如果任務(wù)提交速度過快,可能導(dǎo)致程序阻塞,性能效率上必然的損失較大
- DiscardPolicy: 直接丟棄,其他啥都沒有
- DiscardOldestPolicy: 當(dāng)觸發(fā)拒絕策略,只要線程池沒有關(guān)閉的話,丟棄阻塞隊列 workQueue 中最老的一個任務(wù),并將新任務(wù)加入
線程池如何執(zhí)行

所有任務(wù)的調(diào)度都是由execute方法完成的,這部分完成的工作是:檢查現(xiàn)在線程池的運行狀態(tài)、運行線程數(shù)、運行策略,決定接下來執(zhí)行的流程,是直接申請線程執(zhí)行,或是緩沖到隊列中執(zhí)行,亦或是直接拒絕該任務(wù)。
主要分三步進行:
- 如果運行的線程少于corePoolSize,請嘗試以給定的命令作為第一個線程開始一個新線程任務(wù)。對addWorke的調(diào)用以原子方式檢查運行狀態(tài)和workerCount,從而防止會增加當(dāng)它不應(yīng)該的時候,返回false。
- 如果任務(wù)可以成功排隊,那么我們?nèi)匀恍枰屑?xì)檢查是否應(yīng)該添加線程(因為自從上次檢查以來已有的已經(jīng)死了)或者自進入此方法后,池已關(guān)閉。所以我們重新檢查狀態(tài),必要時回滾排隊已停止,如果沒有,則啟動新線程。
- 如果無法將任務(wù)排隊,則嘗試添加新的線程。如果失敗了,我們就知道我們已經(jīng)關(guān)閉或者飽和了,所以拒絕這個任務(wù)。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}線程池的任務(wù)添加流程
在excute方法中調(diào)用了addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
// 外層循環(huán),負(fù)責(zé)判斷線程池狀態(tài),處理線程池狀態(tài)變量加1操作
retry:
for (;;) {
// 狀態(tài)總體相關(guān)值:運行狀態(tài) + 執(zhí)行線程任務(wù)數(shù)量
int c = ctl.get();
// 讀取狀態(tài)值 - 運行狀態(tài)
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 滿足下面兩大條件的,說明線程池不能接受任務(wù)了,直接返回false處理
// 主要目的就是想說,只有線程池的狀態(tài)為 RUNNING 狀態(tài)時,線程池才會接收
// 新的任務(wù),增加新的Worker工作線程
// 線程池的狀態(tài)已經(jīng)至少已經(jīng)處于不能接收任務(wù)的狀態(tài)了
if (rs >= SHUTDOWN &&
//目的是檢查線 程池是否處于關(guān)閉狀態(tài)
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
// 內(nèi)層循環(huán),負(fù)責(zé)worker數(shù)量加1操作
for (;;) {
// 獲取當(dāng)前worker線程數(shù)量
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
// 如果線程池數(shù)量達(dá)到最大上限值CAPACITY
// core為true時判斷是否大于corePoolSize核心線程數(shù)量
// core為false時判斷是否大于maximumPoolSize最大設(shè)置的線程數(shù)量
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 調(diào)用CAS原子操作,目的是worker線程數(shù)量加1
if (compareAndIncrementWorkerCount(c)) //
break retry;
c = ctl.get(); // Re-read ctl
// CAS原子操作失敗的話,則再次讀取ctl值
if (runStateOf(c) != rs)
// 如果剛剛讀取的c狀態(tài)不等于先前讀取的rs狀態(tài),則繼續(xù)外層循環(huán)判斷
continue retry;
// else CAS failed due to workerCount change; retry inner loop
// 之所以會CAS操作失敗,主要是由于多線程并發(fā)操作,導(dǎo)致workerCount
// 工作線程數(shù)量改變而導(dǎo)致的,因此繼續(xù)內(nèi)層循環(huán)嘗試操作
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 創(chuàng)建一個Worker工作線程對象,將任務(wù)firstTask,
// 新創(chuàng)建的線程thread都封裝到了Worker對象里面
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 由于對工作線程集合workers的添加或者刪除,
// 涉及到線程安全問題,所以才加上鎖且該鎖為非公平鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 獲取鎖成功后,執(zhí)行臨界區(qū)代碼,首先檢查獲取當(dāng)前線程池的狀態(tài)rs
int rs = runStateOf(ctl.get());
// 當(dāng)線程池處于可接收任務(wù)狀態(tài)
// 或者是不可接收任務(wù)狀態(tài),但是有可能該任務(wù)等待隊列中的任務(wù)
// 滿足這兩種條件時,都可以添加新的工作線程
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
// 添加新的工作線程到工作線程集合workers,workers是set集合
int s = workers.size();
if (s > largestPoolSize)
// 變量記錄了線程池在整個生命周期中曾經(jīng)出現(xiàn)的最大線程個數(shù)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 往workers工作線程集合中添加成功后,則立馬調(diào)用線程start方法啟動起來
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 如果啟動線程失敗的話,還得將剛剛添加成功的線程共集合中移除并且做線 // 程數(shù)量做減1操作
addWorkerFailed(w);
}
return workerStarted;
}線程池中任務(wù)的執(zhí)行流程
runWorker 通過調(diào)用t.start()啟動了線程,線程池真正核心執(zhí)行任務(wù)的地方就在此
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// allow interrupts 允許中斷
w.unlock();
boolean completedAbruptly = true;
try {
// 不斷從等待隊列blockingQueue中獲取任務(wù)
// 之前addWorker(null, false)這樣的線程執(zhí)行時,
// 會通過getTask中再次獲取任務(wù)并執(zhí)行
while (task != null || (task = getTask()) != null) {
w.lock();
// 上鎖,并不是防止并發(fā)執(zhí)行任務(wù),
// 而是為了防止shutdown()被調(diào)用時不終止正在運行的worker線程
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// task.run()執(zhí)行前,由子類實現(xiàn)
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // 執(zhí)行線程Runable的run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// task.run()執(zhí)行后,由子類實現(xiàn)
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}如何關(guān)閉線程池
有運行任務(wù)自然也有關(guān)閉任務(wù)。 通過查看ExecutorService接口,其實無非就是兩個方法 shutdown()/shutdownNow()。
但他們有著重要的區(qū)別:
- shutdown() 執(zhí)行后停止接受新任務(wù),會把隊列的任務(wù)執(zhí)行完畢。
- shutdownNow() 也是停止接受新任務(wù),但會中斷所有的任務(wù),將線程池狀態(tài)變?yōu)?stop。
threadPool.shutdown();
threadPool.shutdownNow();到此這篇關(guān)于Java線程池ThreadPoolExecutor的使用及其原理詳細(xì)解讀的文章就介紹到這了,更多相關(guān)Java線程池ThreadPoolExecutor內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺談springboot項目中定時任務(wù)如何優(yōu)雅退出
這篇文章主要介紹了淺談springboot項目中定時任務(wù)如何優(yōu)雅退出?具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09
詳解使用Mybatis-plus + velocity模板生成自定義的代碼
這篇文章主要介紹了詳解使用Mybatis-plus + velocity模板生成自定義的代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03
Java基礎(chǔ)之刪除文本文件中特定行的內(nèi)容
這篇文章主要介紹了Java基礎(chǔ)之刪除文本文件中特定行的內(nèi)容,文中有非常詳細(xì)的代碼示例,對正在學(xué)習(xí)java基礎(chǔ)的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-04-04

