徹底搞懂java并發(fā)ThreadPoolExecutor使用
前言
線程池是Java中使用較多的并發(fā)框架,合理使用線程池,可以:降低資源消耗,提高響應(yīng)速度,提高線程的可管理性。
本篇文章將從線程池簡(jiǎn)單原理,線程池的創(chuàng)建,線程池執(zhí)行任務(wù)和關(guān)閉線程池進(jìn)行使用學(xué)習(xí)。
正文
一. 線程池的簡(jiǎn)單原理
當(dāng)一個(gè)任務(wù)提交到線程池ThreadPoolExecutor時(shí),該任務(wù)的執(zhí)行如下圖所示。

- 如果當(dāng)前運(yùn)行的線程數(shù)小于corePoolSzie(核心線程數(shù)),則創(chuàng)建新線程來(lái)執(zhí)行任務(wù)(需要獲取全局鎖);
- 如果當(dāng)前運(yùn)行的線程數(shù)等于或大于corePoolSzie,則將任務(wù)加入BlockingQueue(任務(wù)阻塞隊(duì)列);
- 如果BlockingQueue已滿(mǎn),則創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)(需要獲取全局鎖);
- 如果創(chuàng)建新線程會(huì)使當(dāng)前線程數(shù)大于maximumPoolSize(最大線程數(shù)),則拒絕任務(wù)并調(diào)用RejectedExecutionHandler的rejectedExecution() 方法。
由于ThreadPoolExecutor存儲(chǔ)工作線程使用的集合是HashSet,因此執(zhí)行上述步驟1和步驟3時(shí)需要獲取全局鎖來(lái)保證線程安全,而獲取全局鎖會(huì)導(dǎo)致線程池性能瓶頸,因此通常情況下,線程池完成預(yù)熱后(當(dāng)前線程數(shù)大于等于corePoolSize),線程池的execute() 方法都是執(zhí)行步驟2。
二. 線程池的創(chuàng)建
通過(guò)ThreadPoolExecutor能夠創(chuàng)建一個(gè)線程池,ThreadPoolExecutor的構(gòu)造函數(shù)簽名如下。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
通過(guò)ThreadPoolExecutor創(chuàng)建線程池時(shí),需要指定線程池的核心線程數(shù),最大線程數(shù),線程保活時(shí)間,線程?;顣r(shí)間單位和任務(wù)阻塞隊(duì)列,并按需指定線程工廠和飽和拒絕策略,如果不指定線程工廠和飽和拒絕策略,則ThreadPoolExecutor會(huì)使用默認(rèn)的線程工廠和飽和拒絕策略。下面分別介紹這些參數(shù)的含義。
| 參數(shù) | 含義 |
|---|---|
| corePoolSize | 核心線程數(shù),即線程池的基本大小。當(dāng)一個(gè)任務(wù)被提交到線程池時(shí),如果線程池的線程數(shù)小于corePoolSize,那么無(wú)論其余線程是否空閑,也需創(chuàng)建一個(gè)新線程來(lái)執(zhí)行任務(wù)。 |
| maximumPoolSize | 最大線程數(shù)。當(dāng)線程池中線程數(shù)大于等于corePoolSize時(shí),新提交的任務(wù)會(huì)加入任務(wù)阻塞隊(duì)列,但是如果任務(wù)阻塞隊(duì)列已滿(mǎn)且線程數(shù)小于maximumPoolSize,此時(shí)會(huì)繼續(xù)創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)。該參數(shù)規(guī)定了線程池允許創(chuàng)建的最大線程數(shù) |
| keepAliveTime | 線程?;顣r(shí)間。當(dāng)線程池的線程數(shù)大于核心線程數(shù)時(shí),多余的空閑線程會(huì)最大存活keepAliveTime的時(shí)間,如果超過(guò)這個(gè)時(shí)間且空閑線程還沒(méi)有獲取到任務(wù)來(lái)執(zhí)行,則該空閑線程會(huì)被回收掉。 |
| unit | 線程保活時(shí)間單位。通過(guò)TimeUnit指定線程?;顣r(shí)間的時(shí)間單位,可選單位有DAYS(天),HOURS(時(shí)),MINUTES(分),SECONDS(秒),MILLISECONDS(毫秒),MICROSECONDS(微秒)和NANOSECONDS(納秒),但無(wú)論指定什么時(shí)間單位,ThreadPoolExecutor統(tǒng)一會(huì)將其轉(zhuǎn)換為NANOSECONDS。 |
| workQueue | 任務(wù)阻塞隊(duì)列。線程池的線程數(shù)大于等于corePoolSize時(shí),新提交的任務(wù)會(huì)添加到workQueue中,所有線程執(zhí)行完上一個(gè)任務(wù)后,會(huì)循環(huán)從workQueue中獲取任務(wù)來(lái)執(zhí)行。 |
| threadFactory | 創(chuàng)建線程的工廠。可以通過(guò)線程工廠給每個(gè)創(chuàng)建出來(lái)的線程設(shè)置更有意義的名字。 |
| handler | 飽和拒絕策略。如果任務(wù)阻塞隊(duì)列已滿(mǎn)且線程池中的線程數(shù)等于maximumPoolSize,說(shuō)明線程池此時(shí)處于飽和狀態(tài),應(yīng)該執(zhí)行一種拒絕策略來(lái)處理新提交的任務(wù)。 |
三. 線程池執(zhí)行任務(wù)
1. 執(zhí)行無(wú)返回值任務(wù)
通過(guò)ThreadPoolExecutor的execute() 方法,能執(zhí)行Runnable任務(wù),示例如下。
public class ThreadPoolExecutorTest {
@Test
public void ThreadPoolExecutor執(zhí)行簡(jiǎn)單無(wú)返回值任務(wù)() throws Exception {
// 創(chuàng)建一個(gè)線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
// 創(chuàng)建兩個(gè)任務(wù)
Runnable firstRunnable = new Runnable() {
@Override
public void run() {
System.out.println("第一個(gè)任務(wù)執(zhí)行");
}
};
Runnable secondRunnable = new Runnable() {
@Override
public void run() {
System.out.println("第二個(gè)任務(wù)執(zhí)行");
}
};
// 讓線程池執(zhí)行任務(wù)
threadPoolExecutor.execute(firstRunnable);
threadPoolExecutor.execute(secondRunnable);
// 讓主線程睡眠1秒,等待線程池中的任務(wù)被執(zhí)行完畢
Thread.sleep(1000);
}
}
運(yùn)行測(cè)試程序,結(jié)果如下。

2. 執(zhí)行有返回值任務(wù)
通過(guò)ThreadPoolExecutor的submit() 方法,能夠執(zhí)行Callable任務(wù),通過(guò)submit() 方法返回的RunnableFuture能夠拿到異步執(zhí)行的結(jié)果。示例如下。
public class ThreadPoolExecutorTest {
@Test
public void ThreadPoolExecutor執(zhí)行簡(jiǎn)單有返回值任務(wù)() throws Exception {
// 創(chuàng)建一個(gè)線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
// 創(chuàng)建兩個(gè)任務(wù),任務(wù)執(zhí)行完有返回值
Callable<String> firstCallable = new Callable<String>() {
@Override
public String call() throws Exception {
return "第一個(gè)任務(wù)返回值";
}
};
Callable<String> secondCallable = new Callable<String>() {
@Override
public String call() throws Exception {
return "第二個(gè)任務(wù)返回值";
}
};
// 讓線程池執(zhí)行任務(wù)
Future<String> firstFuture = threadPoolExecutor.submit(firstCallable);
Future<String> secondFuture = threadPoolExecutor.submit(secondCallable);
// 獲取執(zhí)行結(jié)果,拿不到結(jié)果會(huì)阻塞在get()方法上
System.out.println(firstFuture.get());
System.out.println(secondFuture.get());
}
}
運(yùn)行測(cè)試程序,結(jié)果如下。

3. 執(zhí)行有返回值任務(wù)時(shí)拋出錯(cuò)誤
如果ThreadPoolExecutor在執(zhí)行Callable任務(wù)時(shí),在Callable任務(wù)中拋出了異常并且沒(méi)有捕獲,那么這個(gè)異常是可以通過(guò)Future的get() 方法感知到的。示例如下。
public class ThreadPoolExecutorTest {
@Test
public void ThreadPoolExecutor執(zhí)行簡(jiǎn)單有返回值任務(wù)時(shí)拋出錯(cuò)誤() {
// 創(chuàng)建一個(gè)線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
// 創(chuàng)建一個(gè)任務(wù),任務(wù)有返回值,但是執(zhí)行過(guò)程中拋出異常
Callable<String> exceptionCallable = new Callable<String>() {
@Override
public String call() throws Exception {
throw new RuntimeException("發(fā)生了異常");
}
};
// 讓線程池執(zhí)行任務(wù)
Future<String> exceptionFuture = threadPoolExecutor.submit(exceptionCallable);
try {
System.out.println(exceptionFuture.get());
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
運(yùn)行測(cè)試程序,結(jié)果如下。

4. ThreadPoolExecutor通過(guò)submit方式執(zhí)行Runnable
ThreadPoolExecutor可以通過(guò)submit() 方法來(lái)運(yùn)行Runnable任務(wù),并且還可以異步獲取執(zhí)行結(jié)果。示例如下。
public class ThreadPoolExecutorTest {
@Test
public void ThreadPoolExecutor通過(guò)submit的方式來(lái)提交并執(zhí)行Runnable() throws Exception {
// 創(chuàng)建一個(gè)線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
// 創(chuàng)建結(jié)果對(duì)象
MyResult myResult = new MyResult();
// 創(chuàng)建Runnable對(duì)象
Runnable runnable = new Runnable() {
@Override
public void run() {
myResult.setResult("任務(wù)執(zhí)行了");
}
};
// 通過(guò)ThreadPoolExecutor的submit()方法提交Runnable
Future<MyResult> resultFuture = threadPoolExecutor.submit(runnable, myResult);
// 獲取執(zhí)行結(jié)果
MyResult finalResult = resultFuture.get();
// myResult和finalResult的地址實(shí)際相同
Assert.assertEquals(myResult, finalResult);
// 打印執(zhí)行結(jié)果
System.out.println(resultFuture.get().getResult());
}
private static class MyResult {
String result;
public MyResult() {}
public MyResult(String result) {
this.result = result;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
}
}
運(yùn)行測(cè)試程序,結(jié)果如下。

實(shí)際上ThreadPoolExecutor的submit() 方法無(wú)論是提交Runnable任務(wù)還是Callable任務(wù),都是將任務(wù)封裝成了RunnableFuture接口的子類(lèi)FutureTask,然后調(diào)用ThreadPoolExecutor的execute() 方法來(lái)執(zhí)行FutureTask。
四. 關(guān)閉線程池
關(guān)閉線程池可以通過(guò)ThreadPoolExecutor的shutdown() 方法,但是shutdown() 方法不會(huì)去中斷正在執(zhí)行任務(wù)的線程,所以如果線程池里有Worker正在執(zhí)行一個(gè)永遠(yuǎn)不會(huì)結(jié)束的任務(wù),那么shutdown() 方法是無(wú)法關(guān)閉線程池的。示例如下。
public class ThreadPoolExecutorTest {
@Test
public void 通過(guò)shutdown關(guān)閉線程池() {
// 創(chuàng)建一個(gè)線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
// 創(chuàng)建Runnable對(duì)象
Runnable runnable = new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
LockSupport.parkNanos(1000 * 1000 * 1000);
}
System.out.println(Thread.currentThread().getName() + " 被中斷");
}
};
// 讓線程池執(zhí)行任務(wù)
threadPoolExecutor.execute(runnable);
threadPoolExecutor.execute(runnable);
// 調(diào)用shutdown方法關(guān)閉線程池
threadPoolExecutor.shutdown();
// 等待3秒觀察現(xiàn)象
LockSupport.parkNanos(1000 * 1000 * 1000 * 3L);
}
}
運(yùn)行測(cè)試程序,會(huì)發(fā)現(xiàn)在主線程中等待3秒后,也沒(méi)有得到預(yù)期的打印結(jié)果。如果上述測(cè)試程序中使用shutdownNow,則是可以得到預(yù)期打印結(jié)果的,示例如下。
public class ThreadPoolExecutorTest {
@Test
public void 通過(guò)shutdownNow關(guān)閉線程池() {
// 創(chuàng)建一個(gè)線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
// 創(chuàng)建Runnable對(duì)象
Runnable runnable = new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
LockSupport.parkNanos(1000 * 1000 * 1000);
}
System.out.println(Thread.currentThread().getName() + " 被中斷");
}
};
// 讓線程池執(zhí)行任務(wù)
threadPoolExecutor.execute(runnable);
threadPoolExecutor.execute(runnable);
// 調(diào)用shutdown方法關(guān)閉線程池
threadPoolExecutor.shutdownNow();
// 等待3秒觀察現(xiàn)象
LockSupport.parkNanos(1000 * 1000 * 1000 * 3L);
}
}
運(yùn)行測(cè)試程序,打印如下。

因?yàn)闇y(cè)試程序中的任務(wù)是響應(yīng)中斷的,而ThreadPoolExecutor的shutdownNow() 方法會(huì)中斷所有Worker,所以執(zhí)行shutdownNow() 方法后,正在運(yùn)行的任務(wù)會(huì)響應(yīng)中斷并結(jié)束運(yùn)行,最終線程池關(guān)閉。
假如線程池中運(yùn)行著一個(gè)永遠(yuǎn)不會(huì)結(jié)束的任務(wù),且這個(gè)任務(wù)不響應(yīng)中斷,那么無(wú)論是shutdown() 方法還是shutdownNow() 方法,都是無(wú)法關(guān)閉線程池的。
總結(jié)
ThreadPoolExecutor的使用總結(jié)如下。
- 通過(guò)ThreadPoolExecutor的execute() 方法能夠執(zhí)行Runnable任務(wù);
- 通過(guò)ThreadPoolExecutor的submit() 方法能夠執(zhí)行Runnable任務(wù)和Callable任務(wù),并且能夠獲取異步的執(zhí)行結(jié)果;
- ThreadPoolExecutor的submit() 方法會(huì)返回一個(gè)Future對(duì)象(實(shí)際就是FutureTask),如果任務(wù)執(zhí)行過(guò)程中發(fā)生了異常且未捕獲,那么可以通過(guò)Future的get() 方法感知到異常;
- ThreadPoolExecutor的submit() 方法無(wú)論是提交Runnable任務(wù)還是Callable任務(wù),都是將任務(wù)封裝成了RunnableFuture接口的子類(lèi)FutureTask,然后調(diào)用ThreadPoolExecutor的execute() 方法來(lái)執(zhí)行FutureTask;
- 關(guān)閉線程池時(shí),如果運(yùn)行的任務(wù)可以在有限時(shí)間內(nèi)運(yùn)行完畢,那么可以使用shutdown() 方法來(lái)關(guān)閉線程池,這能夠保證在關(guān)閉線程池時(shí),正在運(yùn)行的任務(wù)會(huì)順利運(yùn)行完畢;
- 關(guān)閉線程池時(shí),如果運(yùn)行的任務(wù)永遠(yuǎn)不會(huì)結(jié)束但是響應(yīng)中斷,那么可以使用shutdownNow() 方法來(lái)關(guān)閉線程池,這種方式不保證任務(wù)順利運(yùn)行完畢;
- 如果任務(wù)永遠(yuǎn)不會(huì)結(jié)束且不響應(yīng)中斷,那么無(wú)論是shutdown() 方法還是shutdownNow() 方法,都無(wú)法關(guān)閉線程池。
以上就是徹底搞懂java并發(fā)ThreadPoolExecutor使用的詳細(xì)內(nèi)容,更多關(guān)于java并發(fā)ThreadPoolExecutor的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Java線程池?ThreadPoolExecutor?詳解
- Java多線程ThreadPoolExecutor詳解
- Java線程池ThreadPoolExecutor源碼深入分析
- java高并發(fā)ThreadPoolExecutor類(lèi)解析線程池執(zhí)行流程
- java高并發(fā)ScheduledThreadPoolExecutor與Timer區(qū)別
- Java多線程編程基石ThreadPoolExecutor示例詳解
- 源碼分析Java中ThreadPoolExecutor的底層原理
- 一文搞懂Java的ThreadPoolExecutor原理
- 一文弄懂Java中ThreadPoolExecutor
相關(guān)文章
Spring?Boot?配置?Hikari?數(shù)據(jù)庫(kù)連接池的操作代碼
數(shù)據(jù)庫(kù)連接池是一個(gè)提高程序與數(shù)據(jù)庫(kù)的連接的優(yōu)化,連接池它主要作用是提高性能、節(jié)省資源、控制連接數(shù)、連接管理等操作,這篇文章主要介紹了SpringBoot配置Hikari數(shù)據(jù)庫(kù)連接池,需要的朋友可以參考下2023-09-09
java以json格式向后臺(tái)服務(wù)器接口發(fā)送請(qǐng)求的實(shí)例
下面小編就為大家分享一篇java以json格式向后臺(tái)服務(wù)器接口發(fā)送請(qǐng)求的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-01-01
詳解Spring Security的Web應(yīng)用和指紋登錄實(shí)踐
這篇文章主要介紹了詳解Spring Security的Web應(yīng)用和指紋登錄實(shí)踐,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-03-03
淺談基于Token的WEB后臺(tái)認(rèn)證機(jī)制
這篇文章主要介紹了淺談基于Token的WEB后臺(tái)認(rèn)證機(jī)制,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-12-12
SpringBoot之配置logging日志及在控制臺(tái)中輸出過(guò)程
這篇文章主要介紹了SpringBoot之配置logging日志及在控制臺(tái)中輸出過(guò)程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-06-06
centos7如何通過(guò)systemctl啟動(dòng)springboot服務(wù)代替java -jar方式啟動(dòng)
這篇文章主要介紹了centos7如何通過(guò)systemctl啟動(dòng)springboot服務(wù)代替java -jar方式啟動(dòng),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2024-01-01
java搭建一個(gè)Socket服務(wù)器響應(yīng)多用戶(hù)訪問(wèn)
本篇文章主要介紹了java搭建一個(gè)Socket服務(wù)器響應(yīng)多用戶(hù)訪問(wèn),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-02-02
Java集合之Map接口與實(shí)現(xiàn)類(lèi)詳解
這篇文章主要為大家詳細(xì)介紹了Java集合中的Map接口與實(shí)現(xiàn)類(lèi),文中的示例代碼講解詳細(xì),對(duì)我們學(xué)習(xí)Java有一定的幫助,感興趣的可以了解一下2022-12-12

