java?ThreadPoolExecutor線程池內(nèi)部處理流程解析
ThreadPoolExecutor線程池內(nèi)部處理淺析
我們知道如果程序中并發(fā)的線程數(shù)量很多,并且每個(gè)線程都是執(zhí)行一個(gè)時(shí)間很短的任務(wù)就結(jié)束時(shí),會(huì)因?yàn)轭l繁創(chuàng)建線程而大大降低系統(tǒng)的效率,因此出現(xiàn)了線程池的使用方式,它可以提前創(chuàng)建好線程來執(zhí)行任務(wù)。本文主要通過java的ThreadPoolExecutor來查看線程池的內(nèi)部處理過程。
ThreadPoolExecutor
java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個(gè)類,下面我們來看一下ThreadPoolExecutor類的部分實(shí)現(xiàn)源碼。
構(gòu)造方法
ThreadPoolExecutor類提供了如下4個(gè)構(gòu)造方法
// 設(shè)置線程池時(shí)指定核心線程數(shù)、最大線程數(shù)、線程存活時(shí)間及等待隊(duì)列。
// 線程創(chuàng)建工廠和拒絕策略使用默認(rèn)的(AbortPolicy)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// 設(shè)置線程池時(shí)指定核心線程數(shù)、最大線程數(shù)、線程存活時(shí)間、等待隊(duì)列及線程創(chuàng)建工廠
// 拒絕策略使用默認(rèn)的(AbortPolicy)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
// 設(shè)置線程池時(shí)指定核心線程數(shù)、最大線程數(shù)、線程存活時(shí)間、等待隊(duì)列及拒絕策略
// 線程創(chuàng)建工廠使用默認(rèn)的
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
// 設(shè)置線程池時(shí)指定核心線程數(shù)、最大線程數(shù)、線程存活時(shí)間、等待隊(duì)列、線程創(chuàng)建工廠及拒絕策略
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}通過觀察上述每個(gè)構(gòu)造器的源碼實(shí)現(xiàn),我們可以發(fā)現(xiàn)前面三個(gè)構(gòu)造器都是調(diào)用的第四個(gè)構(gòu)造器進(jìn)行的初始化工作。
下面解釋一下構(gòu)造器中各個(gè)參數(shù)的含義:
- corePoolSize:核心池的線程個(gè)數(shù)上線,在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒有任何線程,而是等待有任務(wù)到來才創(chuàng)建線程去執(zhí)行任務(wù)。默認(rèn)情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中。
- maximumPoolSize:線程池最大線程數(shù),這個(gè)參數(shù)也是一個(gè)非常重要的參數(shù),它表示在線程池中最多能創(chuàng)建多少個(gè)線程。
- keepAliveTime:表示線程沒有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),keepAliveTime才會(huì)起作用,直到線程池中的線程數(shù)不大于corePoolSize,即當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime,則會(huì)終止,直到線程池中的線程數(shù)不超過corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時(shí),keepAliveTime參數(shù)也會(huì)起作用,直到線程池中的線程數(shù)為0。
- unit:參數(shù)keepAliveTime的時(shí)間單位。
- workQueue:一個(gè)阻塞隊(duì)列,用來存儲(chǔ)等待執(zhí)行的任務(wù),這個(gè)參數(shù)的選擇也很重要,會(huì)對線程池的運(yùn)行過程產(chǎn)生重大影響;
- threadFactory:線程工廠,主要用來創(chuàng)建線程;
- handler:表示當(dāng)拒絕處理任務(wù)時(shí)的策略。有以下四種取值:ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)。
核心方法
在ThreadPoolExecutor類中,最核心的任務(wù)提交方法是execute()方法,雖然通過submit也可以提交任務(wù),但是實(shí)際上submit方法里面最終調(diào)用的還是execute()方法。
public void execute(Runnable command) {
// 判斷提交的任務(wù)command是否為null,若是null,則拋出空指針異常;
if (command == null)
throw new NullPointerException();
// 獲取線程池中當(dāng)前線程數(shù)
int c = ctl.get();
// 如果線程池中當(dāng)前線程數(shù)小于核心池大小,進(jìn)入if語句塊
if (workerCountOf(c) < corePoolSize) {
// 如果以給定的命令啟動(dòng)一個(gè)核心線程執(zhí)行任務(wù)成功,直接返回
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果當(dāng)前線程池處于RUNNING狀態(tài),則將任務(wù)放入任務(wù)緩存隊(duì)列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果線程池不處于運(yùn)行狀態(tài)并且移除剛加入的任務(wù)成功則執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果當(dāng)前線程數(shù)為0,則在線程池里增加一個(gè)線程,保證隊(duì)列里的任務(wù)不會(huì)沒有線程執(zhí)行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 嘗試啟動(dòng)核心線程之外的線程,如果不滿足,則執(zhí)行對應(yīng)的拒絕策略
else if (!addWorker(command, false))
reject(command);
}主要方法addWorker。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果線程池狀態(tài)大于SHUTDOWN或者線程池狀態(tài)等于SHUTDOWN,firstTask不等于null
// 或者線程池狀態(tài)等于SHUTDOWN,任務(wù)隊(duì)列等于空時(shí),直接返回false結(jié)束。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 如果線程數(shù)量大于等于最大數(shù)量或者大于等于上限
//(入?yún)ore傳true,取核心線程數(shù),否則取最大線程數(shù)),直接返回false結(jié)束。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false
// CAS操作給工作線程數(shù)加1,成功則跳到retry處,不再進(jìn)入循環(huán)。
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果線程池狀態(tài)與剛進(jìn)入時(shí)不一致,則跳到retry處,再次進(jìn)入循環(huán)
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建一個(gè)線程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 如果線程池狀態(tài)在SHUTDOWN之前或者
// 線程池狀態(tài)等于SHUTDOWN并且firstTask等于null時(shí),進(jìn)入處理。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果要執(zhí)行的線程正在運(yùn)行,則拋異常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啟動(dòng)線程
t.start();
workerStarted = true;
}
}
} finally {
// 如果線程添加失敗,則將新增的對應(yīng)信息刪除
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}任務(wù)執(zhí)行run方法
在上述addWorker中,當(dāng)調(diào)用線程的start方法啟動(dòng)線程后,會(huì)執(zhí)行其中的run方法。
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果任務(wù)不為空或者新獲取到的任務(wù)不為空
while (task != null || (task = getTask()) != null) {
w.lock();
// 當(dāng)線程池狀態(tài),大于等于 STOP 時(shí),保證工作線程都有中斷標(biāo)志。
// 當(dāng)線程池狀態(tài),小于STOP時(shí),保證工作線程都沒有中斷標(biāo)志。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}整體處理過程
通過上述源碼分析,我們可以得出線程池處理任務(wù)的過程如下:

總結(jié)
本文從源碼層面主要分析了線程池的創(chuàng)建、運(yùn)行過程,通過上述的分析,可以看出當(dāng)線程池中的線程數(shù)量超過核心線程數(shù)后,會(huì)先將任務(wù)放入等待隊(duì)列,隊(duì)列放滿后當(dāng)最大線程數(shù)大于核心線程數(shù)時(shí),才會(huì)創(chuàng)建新的線程執(zhí)行。
以上就是java ThreadPoolExecutor線程池內(nèi)部處理流程解析的詳細(xì)內(nèi)容,更多關(guān)于java ThreadPoolExecutor線程池的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Netty網(wǎng)絡(luò)編程零基礎(chǔ)入門
Netty是一個(gè)異步的、基于事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用框架,用于快速開發(fā)可維護(hù)、高性能的網(wǎng)絡(luò)服務(wù)器和客戶端,如果你還不了解它的使用,就趕快繼續(xù)往下看吧2022-08-08
springboot整合sentinel接口熔斷的實(shí)現(xiàn)示例
為了防止慢接口導(dǎo)致的服務(wù)阻塞,可以通過添加熔斷處理來避免應(yīng)用的大量工作線程陷入阻塞,保證其他接口的正常運(yùn)行,本文介紹了如何使用Spring Boot與Sentinel進(jìn)行接口熔斷的配置與實(shí)現(xiàn),感興趣的可以了解一下2024-09-09
關(guān)于Idea卡在Resolving Maven dependencies的解決方案
本文詳細(xì)介紹了關(guān)于Idea卡在Resolving Maven dependencies的解決方案,文中通過圖文結(jié)合的形式給大家介紹的非常詳細(xì),對大家解決問題有一定的幫助,需要的朋友可以參考下2024-02-02
Springboot mybatis-plus配置及用法詳解
這篇文章主要介紹了Springboot mybatis-plus配置及用法詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09
Intellij IDEA 斷點(diǎn)不可用報(bào)錯(cuò) No executable 
這篇文章主要介紹了Intellij IDEA 斷點(diǎn)不可用報(bào)錯(cuò) No executable code found問題及解決方案,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-10-10

