java中常見的6種線程池示例詳解

- 之前我們介紹了線程池的四種拒絕策略,了解了線程池參數(shù)的含義,那么今天我們來聊聊Java 中常見的幾種線程池,以及在jdk7 加入的 ForkJoin 新型線程池
- 首先我們列出Java 中的六種線程池如下
| 線程池名稱 | 描述 |
|---|---|
| FixedThreadPool | 核心線程數(shù)與最大線程數(shù)相同 |
| SingleThreadExecutor | 一個(gè)線程的線程池 |
| CachedThreadPool | 核心線程為0,最大線程數(shù)為Integer. MAX_VALUE |
| ScheduledThreadPool | 指定核心線程數(shù)的定時(shí)線程池 |
| SingleThreadScheduledExecutor | 單例的定時(shí)線程池 |
| ForkJoinPool | JDK 7 新加入的一種線程池 |
在了解集中線程池時(shí)我們先來熟悉一下主要幾個(gè)類的關(guān)系, ThreadPoolExecutor 的類圖,以及 Executors 的主要方法:


上面看到的類圖,方便幫助下面的理解和查看,我們可以看到一個(gè)核心類 ExecutorService , 這是我們線程池都實(shí)現(xiàn)的基類,我們接下來說的都是它的實(shí)現(xiàn)類。
FixedThreadPool
FixedThreadPool 線程池的特點(diǎn)是它的核心線程數(shù)和最大線程數(shù)一樣,我們可以看它的實(shí)現(xiàn)代碼在 Executors#newFixedThreadPool(int) 中,如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
我們可以看到方法內(nèi)創(chuàng)建線程調(diào)用的實(shí)際是 ThreadPoolExecutor 類,這是線程池的核心執(zhí)行器,傳入的 nThread 參數(shù)作為核心線程數(shù)和最大線程數(shù)傳入,隊(duì)列采用了一個(gè)鏈表結(jié)構(gòu)的有界隊(duì)列。
- 這種線程池我們可以看作是固定線程數(shù)的線程池,它只有在開始初始化的時(shí)候線程數(shù)會(huì)從0開始創(chuàng)建,但是創(chuàng)建好后就不再銷毀,而是全部作為常駐線程池,這里如果對(duì)線程池參數(shù)不理解的可以看之前文章 《解釋線程池各個(gè)參數(shù)的含義》。
- 對(duì)于這種線程池他的第三個(gè)和第四個(gè)參數(shù)是沒意義,它們是空閑線程存活時(shí)間,這里都是常駐不存在銷毀,當(dāng)線程處理不了時(shí)會(huì)加入到阻塞隊(duì)列,這是一個(gè)鏈表結(jié)構(gòu)的有界阻塞隊(duì)列,最大長度是Integer. MAX_VALUE
SingleThreadExecutor
SingleThreadExecutor 線程的特點(diǎn)是它的核心線程數(shù)和最大線程數(shù)均為1,我們也可以將其任務(wù)是一個(gè)單例線程池,它的實(shí)現(xiàn)代碼是 Executors#newSingleThreadExcutor() , 如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
- 上述代碼中我們發(fā)現(xiàn)它有一個(gè)重載函數(shù),傳入了一個(gè)ThreadFactory 的參數(shù),一般在我們開發(fā)中會(huì)傳入我們自定義的線程創(chuàng)建工廠,如果不傳入則會(huì)調(diào)用默認(rèn)的線程工廠
- 我們可以看到它與 FixedThreadPool 線程池的區(qū)別僅僅是核心線程數(shù)和最大線程數(shù)改為了1,也就是說不管任務(wù)多少,它只會(huì)有唯一的一個(gè)線程去執(zhí)行
- 如果在執(zhí)行過程中發(fā)生異常等導(dǎo)致線程銷毀,線程池也會(huì)重新創(chuàng)建一個(gè)線程來執(zhí)行后續(xù)的任務(wù)
- 這種線程池非常適合所有任務(wù)都需要按被提交的順序來執(zhí)行的場景,是個(gè)單線程的串行。
CachedThreadPool
cachedThreadPool 線程池的特點(diǎn)是它的常駐核心線程數(shù)為0,正如其名字一樣,它所有的縣城都是臨時(shí)的創(chuàng)建,關(guān)于它的實(shí)現(xiàn)在 Executors#newCachedThreadPool() 中,代碼如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
- 從上述代碼中我們可以看到 CachedThreadPool 線程池中,最大線程數(shù)為 Integer.MAX_VALUE , 意味著他的線程數(shù)幾乎可以無限增加。
- 因?yàn)閯?chuàng)建的線程都是臨時(shí)線程,所以他們都會(huì)被銷毀,這里空閑 線程銷毀時(shí)間是60秒,也就是說當(dāng)線程在60秒內(nèi)沒有任務(wù)執(zhí)行則銷毀
- 這里我們需要注意點(diǎn),它使用了 SynchronousQueue 的一個(gè)阻塞隊(duì)列來存儲(chǔ)任務(wù),這個(gè)隊(duì)列是無法存儲(chǔ)的,因?yàn)樗娜萘繛?,它只負(fù)責(zé)對(duì)任務(wù)的傳遞和中轉(zhuǎn),效率會(huì)更高,因?yàn)楹诵木€程都為0,這個(gè)隊(duì)列如果存儲(chǔ)任務(wù)不存在意義。
ScheduledThreadPool
ScheduledThreadPool 線程池是支持定時(shí)或者周期性執(zhí)行任務(wù),他的創(chuàng)建代碼 Executors.newSchedsuledThreadPool(int) 中,如下所示:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
我們發(fā)現(xiàn)這里調(diào)用了 ScheduledThreadPoolExecutor 這個(gè)類的構(gòu)造函數(shù),進(jìn)一步查看發(fā)現(xiàn) ScheduledThreadPoolExecutor 類是一個(gè)繼承了 ThreadPoolExecutor 的,同時(shí)實(shí)現(xiàn)了 ScheduledExecutorService 接口,我們看到它的幾個(gè)構(gòu)造函數(shù)都是調(diào)用父類 ThreadPoolExecutor 的構(gòu)造函數(shù)
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
從上面代碼我們可以看到和其他線程池創(chuàng)建并沒有差異,只是這里的任務(wù)隊(duì)列是 DelayedWorkQueue 關(guān)于阻塞丟列我們下篇文章專門說,這里我們先創(chuàng)建一個(gè)周期性的線程池來看一下
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
// 1. 延遲一定時(shí)間執(zhí)行一次
service.schedule(() ->{
System.out.println("schedule ==> 云棲簡碼-i-code.online");
},2, TimeUnit.SECONDS);
// 2. 按照固定頻率周期執(zhí)行
service.scheduleAtFixedRate(() ->{
System.out.println("scheduleAtFixedRate ==> 云棲簡碼-i-code.online");
},2,3,TimeUnit.SECONDS);
//3. 按照固定頻率周期執(zhí)行
service.scheduleWithFixedDelay(() -> {
System.out.println("scheduleWithFixedDelay ==> 云棲簡碼-i-code.online");
},2,5,TimeUnit.SECONDS);
}
上面代碼是我們簡單創(chuàng)建了 newScheduledThreadPool ,同時(shí)演示了里面的三個(gè)核心方法,首先看執(zhí)行的結(jié)果:

首先我們看第一個(gè)方法 schedule , 它有三個(gè)參數(shù),第一個(gè)參數(shù)是線程任務(wù),第二個(gè) delay 表示任務(wù)執(zhí)行延遲時(shí)長,第三個(gè) unit 表示延遲時(shí)間的單位,如上面代碼所示就是延遲兩秒后執(zhí)行任務(wù)
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
第二個(gè)方法是 scheduleAtFixedRate 如下, 它有四個(gè)參數(shù), command 參數(shù)表示執(zhí)行的線程任務(wù) , initialDelay 參數(shù)表示第一次執(zhí)行的延遲時(shí)間, period 參數(shù)表示第一次執(zhí)行之后按照多久一次的頻率來執(zhí)行,最后一個(gè)參數(shù)是時(shí)間單位。如上面案例代碼所示,表示兩秒后執(zhí)行第一次,之后按每隔三秒執(zhí)行一次
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
第三個(gè)方法是 scheduleWithFixedDelay 如下,它與上面方法是非常類似的,也是周期性定時(shí)執(zhí)行, 參數(shù)含義和上面方法一致。這個(gè)方法和 scheduleAtFixedRate 的區(qū)別主要在于時(shí)間的起點(diǎn)計(jì)時(shí)不同
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
scheduleAtFixedRate 是以任務(wù)開始的時(shí)間為時(shí)間起點(diǎn)來計(jì)時(shí),時(shí)間到就執(zhí)行第二次任務(wù),與任務(wù)執(zhí)行所花費(fèi)的時(shí)間無關(guān);而 scheduleWithFixedDelay 是以任務(wù)執(zhí)行結(jié)束的時(shí)間點(diǎn)作為計(jì)時(shí)的開始。如下所示

SingleThreadScheduledExecutor 它實(shí)際和 ScheduledThreadPool 線程池非常相似,它只是 ScheduledThreadPool 的一個(gè)特例,內(nèi)部只有一個(gè)線程,它只是將 ScheduledThreadPool 的核心線程數(shù)設(shè)置為了 1。如源碼所示:
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
上面我們介紹了五種常見的線程池,對(duì)于這些線程池我們可以從核心線程數(shù)、最大線程數(shù)、存活時(shí)間三個(gè)維度進(jìn)行一個(gè)簡單的對(duì)比,有利于我們加深對(duì)這幾種線程池的記憶。
| FixedThreadPool | SingleThreadExecutor | CachedThreadPool | ScheduledThreadPool | SingleThreadScheduledExecutor | |
|---|---|---|---|---|---|
| corePoolSize | 構(gòu)造函數(shù)傳入 | 1 | 0 | 構(gòu)造函數(shù)傳入 | 1 |
| maxPoolSize | 同corePoolSize | 1 | Integer. MAX_VALUE | Integer. MAX_VALUE | Integer. MAX_VALUE |
| keepAliveTime | 0 | 0 | 60 | 0 | 0 |
ForkJoinPool ForkJoinPool 這是一個(gè)在 JDK7 引入的新新線程池,它的主要特點(diǎn)是可以充分利用多核 CPU , 可以把一個(gè)任務(wù)拆分為多個(gè)子任務(wù),這些子任務(wù)放在不同的處理器上并行執(zhí)行,當(dāng)這些子任務(wù)執(zhí)行結(jié)束后再把這些結(jié)果合并起來,這是一種分治思想。 ForkJoinPool 也正如它的名字一樣,第一步進(jìn)行 Fork 拆分,第二步進(jìn)行 Join 合并,我們先來看一下它的類圖結(jié)構(gòu)

ForkJoinPool 的使用也是通過調(diào)用 submit(ForkJoinTask<T> task) 或 invoke(ForkJoinTask<T> task) 方法來執(zhí)行指定任務(wù)了。其中任務(wù)的類型是 ForkJoinTask 類,它代表的是一個(gè)可以合并的子任務(wù),他本身是一個(gè)抽象類,同時(shí)還有兩個(gè)常用的抽象子類 RecursiveAction 和 RecursiveTask ,其中 RecursiveTask 表示的是有返回值類型的任務(wù),而 RecursiveAction 則表示無返回值的任務(wù)。下面是它們的類圖:

下面我們通過一個(gè)簡單的代碼先來看一下如何使用 ForkJoinPool 線程池
/**
* @url: i-code.online
* @author: AnonyStar
* @time: 2020/11/2 10:01
*/
public class ForkJoinApp1 {
/**
目標(biāo): 打印0-200以內(nèi)的數(shù)字,進(jìn)行分段每個(gè)間隔為10以上,測試forkjoin
*/
public static void main(String[] args) {
// 創(chuàng)建線程池,
ForkJoinPool joinPool = new ForkJoinPool();
// 創(chuàng)建根任務(wù)
SubTask subTask = new SubTask(0,200);
// 提交任務(wù)
joinPool.submit(subTask);
//讓線程阻塞等待所有任務(wù)完成 在進(jìn)行關(guān)閉
try {
joinPool.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
joinPool.shutdown();
}
}
class SubTask extends RecursiveAction {
int startNum;
int endNum;
public SubTask(int startNum,int endNum){
super();
this.startNum = startNum;
this.endNum = endNum;
}
@Override
protected void compute() {
if (endNum - startNum < 10){
// 如果分裂的兩者差值小于10 則不再繼續(xù),直接打印
System.out.println(Thread.currentThread().getName()+": [startNum:"+startNum+",endNum:"+endNum+"]");
}else {
// 取中間值
int middle = (startNum + endNum) / 2;
//創(chuàng)建兩個(gè)子任務(wù),以遞歸思想,
SubTask subTask = new SubTask(startNum,middle);
SubTask subTask1 = new SubTask(middle,endNum);
//執(zhí)行任務(wù), fork() 表示異步的開始執(zhí)行
subTask.fork();
subTask1.fork();
}
}
}
結(jié)果:

從上面的案例我們可以看到我們,創(chuàng)建了很多個(gè)線程執(zhí)行,因?yàn)槲覝y試的電腦是12線程的,所以這里實(shí)際是創(chuàng)建了12個(gè)線程,也側(cè)面說明了充分調(diào)用了每個(gè)處理的線程處理能力 上面案例其實(shí)我們發(fā)現(xiàn)很熟悉的味道,那就是以前接觸過的遞歸思想,將上面的案例圖像化如下,更直觀的看到,

上面的例子是無返回值的案例,下面我們來看一個(gè)典型的有返回值的案例,相信大家都聽過及很熟悉斐波那契數(shù)列,這個(gè)數(shù)列有個(gè)特點(diǎn)就是最后一項(xiàng)的結(jié)果等于前兩項(xiàng)的和,如: 0,1,1,2,3,5...f(n-2)+f(n-1) , 即第0項(xiàng)為0 ,第一項(xiàng)為1,則第二項(xiàng)為 0+1=1 ,以此類推。我們最初的解決方法就是使用遞歸來解決,如下計(jì)算第n項(xiàng)的數(shù)值:
private int num(int num){
if (num <= 1){
return num;
}
num = num(num-1) + num(num -2);
return num;
}
從上面簡單代碼中可以看到,當(dāng) n<=1 時(shí)返回 n , 如果 n>1 則計(jì)算前一項(xiàng)的值 f1 ,在計(jì)算前兩項(xiàng)的值 f2 , 再將兩者相加得到結(jié)果,這就是典型的遞歸問題,也是對(duì)應(yīng)我們的 ForkJoin 的工作模式,如下所示,根節(jié)點(diǎn)產(chǎn)生子任務(wù),子任務(wù)再次衍生出子子任務(wù),到最后在進(jìn)行整合匯聚,得到結(jié)果。

我們通過 ForkJoinPool 來實(shí)現(xiàn)斐波那契數(shù)列的計(jì)算,如下展示:
/**
* @url: i-code.online
* @author: AnonyStar
* @time: 2020/11/2 10:01
*/
public class ForkJoinApp3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool pool = new ForkJoinPool();
//計(jì)算第二是項(xiàng)的數(shù)值
final ForkJoinTask<Integer> submit = pool.submit(new Fibonacci(20));
// 獲取結(jié)果,這里獲取的就是異步任務(wù)的最終結(jié)果
System.out.println(submit.get());
}
}
class Fibonacci extends RecursiveTask<Integer>{
int num;
public Fibonacci(int num){
this.num = num;
}
@Override
protected Integer compute() {
if (num <= 1) return num;
//創(chuàng)建子任務(wù)
Fibonacci subTask1 = new Fibonacci(num - 1);
Fibonacci subTask2 = new Fibonacci(num - 2);
// 執(zhí)行子任務(wù)
subTask1.fork();
subTask2.fork();
//獲取前兩項(xiàng)的結(jié)果來計(jì)算和
return subTask1.join()+subTask2.join();
}
}
通過 ForkJoinPool 可以極大的發(fā)揮多核處理器的優(yōu)勢(shì),尤其非常適合用于遞歸的場景,例如樹的遍歷、最優(yōu)路徑搜索等場景。 上面說的是 ForkJoinPool 的使用上的,下面我們來說一下其內(nèi)部的構(gòu)造,對(duì)于我們前面說的幾種線程池來說,它們都是里面只有一個(gè)隊(duì)列,所有的線程共享一個(gè)。但是在 ForkJoinPool 中,其內(nèi)部有一個(gè)共享的任務(wù)隊(duì)列,除此之外每個(gè)線程都有一個(gè)對(duì)應(yīng)的雙端隊(duì)列 Deque , 當(dāng)一個(gè)線程中任務(wù)被 Fork 分裂了,那么分裂出來的子任務(wù)就會(huì)放入到對(duì)應(yīng)的線程自己的 Deque 中,而不是放入公共隊(duì)列。這樣對(duì)于每個(gè)線程來說成本會(huì)降低很多,可以直接從自己線程的隊(duì)列中獲取任務(wù)而不需要去公共隊(duì)列中爭奪,有效的減少了線程間的資源競爭和切換。

有一種情況,當(dāng)線程有多個(gè)如 t1,t2,t3... ,在某一段時(shí)間線程 t1 的任務(wù)特別繁重,分裂了數(shù)十個(gè)子任務(wù),但是線程 t0 此時(shí)卻無事可做,它自己的 deque 隊(duì)列為空,這時(shí)為了提高效率, t0 就會(huì)想辦法幫助 t1 執(zhí)行任務(wù),這就是“ work-stealing ”的含義。 雙端隊(duì)列 deque 中,線程 t1 獲取任務(wù)的邏輯是后進(jìn)先出,也就是 LIFO(Last In Frist Out) ,而線程 t0 在“ steal ”偷線程 t1 的 deque 中的任務(wù)的邏輯是先進(jìn)先出,也就是 FIFO(Fast In Frist Out) ,如圖所示,圖中很好的描述了兩個(gè)線程使用雙端隊(duì)列分別獲取任務(wù)的情景。你可以看到,使用 “ work-stealing ” 算法和雙端隊(duì)列很好地平衡了各線程的負(fù)載。

總結(jié)
到此這篇關(guān)于java中常見的6種線程池示例詳解的文章就介紹到這了,更多相關(guān)java常見線程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
kafka生產(chǎn)者發(fā)送消息流程深入分析講解
本文將介紹kafka的一條消息的發(fā)送流程,從消息的發(fā)送到服務(wù)端的存儲(chǔ)。上文說到kafak分為客戶端與服務(wù)端,要發(fā)送消息就涉及到了網(wǎng)絡(luò)通訊,kafka采用TCP協(xié)議進(jìn)行客戶端與服務(wù)端的通訊協(xié)議2023-03-03
Java.lang.Long.parseLong()方法詳解及示例
這個(gè)java.lang.Long.parseLong(String s) 方法解析字符串參數(shù)s作為有符號(hào)十進(jìn)制長,下面這篇文章主要給大家介紹了關(guān)于Java.lang.Long.parseLong()方法詳解及示例的相關(guān)資料,需要的朋友可以參考下2023-01-01
springboot~ObjectMapper~dto到entity的自動(dòng)賦值
這篇文章主要介紹了springboot~ObjectMapper~dto到entity的自動(dòng)賦值,本文分三種情況給大家介紹,需要的朋友可以參考下2018-08-08
Springboot項(xiàng)目中kaptcha驗(yàn)證碼的使用方式
這篇文章主要介紹了Springboot項(xiàng)目中kaptcha驗(yàn)證碼的使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05

