Java線(xiàn)程池Executor用法詳解
線(xiàn)程池類(lèi)圖

我們最常使用的Executors實(shí)現(xiàn)創(chuàng)建線(xiàn)程池使用線(xiàn)程主要是用上述類(lèi)圖中提供的類(lèi)。在上邊的類(lèi)圖中,包含了一個(gè)Executor框架,它是一個(gè)根據(jù)一組執(zhí)行策略的調(diào)用調(diào)度執(zhí)行和控制異步任務(wù)的框架,目的是提供一種將任務(wù)提交與任務(wù)如何運(yùn)行分離開(kāi)的機(jī)制。它包含了三個(gè)executor接口:
- Executor:運(yùn)行新任務(wù)的簡(jiǎn)單接口
- ExecutorService:擴(kuò)展了Executor,添加了用來(lái)管理執(zhí)行器生命周期和任務(wù)生命周期的方法
- ScheduleExcutorService:擴(kuò)展了ExecutorService,支持Future和定期執(zhí)行任務(wù)
線(xiàn)程池的好處
- 降低資源消耗-重用存在的線(xiàn)程,減少對(duì)象創(chuàng)建、消亡的開(kāi)銷(xiāo),性能好
- 提高響應(yīng)速度 -可有效控制最大并發(fā)線(xiàn)程數(shù),提高系統(tǒng)資源利用率,同時(shí)可以避免過(guò)多資源競(jìng)爭(zhēng),避免阻塞。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可不用等待線(xiàn)程創(chuàng)建就能立即執(zhí)行
- 提高線(xiàn)程的可管理性-提供定時(shí)執(zhí)行、定期執(zhí)行、單線(xiàn)程、并發(fā)數(shù)控制等功能。
new Thread的弊端
- 每次new Thread 新建對(duì)象,性能差
- 線(xiàn)程缺乏統(tǒng)一管理,可能無(wú)限制的新建線(xiàn)程,相互競(jìng)爭(zhēng),可能占用過(guò)多的系統(tǒng)資源導(dǎo)致死機(jī)或者OOM(out of memory 內(nèi)存溢出),這種問(wèn)題的原因不是因?yàn)閱渭兊膎ew一個(gè)Thread,而是可能因?yàn)槌绦虻腷ug或者設(shè)計(jì)上的缺陷導(dǎo)致不斷new Thread造成的。
- 缺少更多功能,如更多執(zhí)行、定期執(zhí)行、線(xiàn)程中斷。
線(xiàn)程池核心類(lèi)-ThreadPoolExecutor
參數(shù)說(shuō)明:ThreadPoolExecutor一共有七個(gè)參數(shù),這七個(gè)參數(shù)配合起來(lái),構(gòu)成了線(xiàn)程池強(qiáng)大的功能。
corePoolSize:核心線(xiàn)程數(shù)量
maximumPoolSize:線(xiàn)程最大線(xiàn)程數(shù)
workQueue:阻塞隊(duì)列,存儲(chǔ)等待執(zhí)行的任務(wù),很重要,會(huì)對(duì)線(xiàn)程池運(yùn)行過(guò)程產(chǎn)生重大影響
當(dāng)我們提交一個(gè)新的任務(wù)到線(xiàn)程池,線(xiàn)程池會(huì)根據(jù)當(dāng)前池中正在運(yùn)行的線(xiàn)程數(shù)量來(lái)決定該任務(wù)的處理方式。處理方式有三種:
1、直接切換(SynchronusQueue)
2、無(wú)界隊(duì)列(LinkedBlockingQueue)能夠創(chuàng)建的最大線(xiàn)程數(shù)為corePoolSize,這時(shí)maximumPoolSize就不會(huì)起作用了。當(dāng)線(xiàn)程池中所有的核心線(xiàn)程都是運(yùn)行狀態(tài)的時(shí)候,新的任務(wù)提交就會(huì)放入等待隊(duì)列中。
3、有界隊(duì)列(ArrayBlockingQueue)最大maximumPoolSize,能夠降低資源消耗,但是這種方式使得線(xiàn)程池對(duì)線(xiàn)程調(diào)度變的更困難。因?yàn)榫€(xiàn)程池與隊(duì)列容量都是有限的。所以想讓線(xiàn)程池的吞吐率和處理任務(wù)達(dá)到一個(gè)合理的范圍,又想使我們的線(xiàn)程調(diào)度相對(duì)簡(jiǎn)單,并且還盡可能降低資源的消耗,我們就需要合理的限制這兩個(gè)數(shù)量 分配技巧: [如果想降低資源的消耗包括降低cpu使用率、操作系統(tǒng)資源的消耗、上下文切換的開(kāi)銷(xiāo)等等,可以設(shè)置一個(gè)較大的隊(duì)列容量和較小的線(xiàn)程池容量,這樣會(huì)降低線(xiàn)程池的吞吐量。如果我們提交的任務(wù)經(jīng)常發(fā)生阻塞,我們可以調(diào)整maximumPoolSize。如果我們的隊(duì)列容量較小,我們需要把線(xiàn)程池大小設(shè)置的大一些,這樣cpu的使用率相對(duì)來(lái)說(shuō)會(huì)高一些。但是如果線(xiàn)程池的容量設(shè)置的過(guò)大,提高任務(wù)的數(shù)量過(guò)多的時(shí)候,并發(fā)量會(huì)增加,那么線(xiàn)程之間的調(diào)度就是一個(gè)需要考慮的問(wèn)題。這樣反而可能會(huì)降低處理任務(wù)的吞吐量。]
keepAliveTime:線(xiàn)程沒(méi)有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間終止(當(dāng)線(xiàn)程中的線(xiàn)程數(shù)量大于corePoolSize的時(shí)候,如果這時(shí)沒(méi)有新的任務(wù)提交核心線(xiàn)程外的線(xiàn)程不會(huì)立即銷(xiāo)毀,而是等待,直到超過(guò)keepAliveTime)
unit:keepAliveTime的時(shí)間單位
threadFactory:線(xiàn)程工廠,用來(lái)創(chuàng)建線(xiàn)程,有一個(gè)默認(rèn)的工場(chǎng)來(lái)創(chuàng)建線(xiàn)程,這樣新創(chuàng)建出來(lái)的線(xiàn)程有相同的優(yōu)先級(jí),是非守護(hù)線(xiàn)程、設(shè)置好了名稱(chēng))
rejectHandler:當(dāng)拒絕處理任務(wù)時(shí)(阻塞隊(duì)列滿(mǎn))的策略(AbortPolicy默認(rèn)策略直接拋出異常、CallerRunsPolicy用調(diào)用者所在的線(xiàn)程執(zhí)行任務(wù)、DiscardOldestPolicy丟棄隊(duì)列中最靠前的任務(wù)并執(zhí)行當(dāng)前任務(wù)、DiscardPolicy直接丟棄當(dāng)前任務(wù))

corePoolSize、maximumPoolSize、workQueue 三者關(guān)系:如果運(yùn)行的線(xiàn)程數(shù)小于corePoolSize的時(shí)候,直接創(chuàng)建新線(xiàn)程來(lái)處理任務(wù)。即使線(xiàn)程池中的其他線(xiàn)程是空閑的。如果運(yùn)行中的線(xiàn)程數(shù)大于corePoolSize且小于maximumPoolSize時(shí),那么只有當(dāng)workQueue滿(mǎn)的時(shí)候才創(chuàng)建新的線(xiàn)程去處理任務(wù)。如果corePoolSize與maximumPoolSize是相同的,那么創(chuàng)建的線(xiàn)程池大小是固定的。這時(shí)有新任務(wù)提交,當(dāng)workQueue未滿(mǎn)時(shí),就把請(qǐng)求放入workQueue中。等待空線(xiàn)程從workQueue取出任務(wù)。如果workQueue此時(shí)也滿(mǎn)了,那么就使用另外的拒絕策略參數(shù)去執(zhí)行拒絕策略。
初始化方法:由七個(gè)參數(shù)組合成四個(gè)初始化方法

其他方法:
execute(); //提交任務(wù),交給線(xiàn)程池執(zhí)行 submit();//提交任務(wù),能夠返回執(zhí)行結(jié)果 execute+Future shutdown();//關(guān)閉線(xiàn)程池,等待任務(wù)都執(zhí)行完 shutdownNow();//關(guān)閉線(xiàn)程池,不等待任務(wù)執(zhí)行完 getTaskCount();//線(xiàn)程池已執(zhí)行和未執(zhí)行的任務(wù)總數(shù) getCompleteTaskCount();//已完成的任務(wù)數(shù)量 getPoolSize();//線(xiàn)程池當(dāng)前的線(xiàn)程數(shù)量 getActiveCount();//當(dāng)前線(xiàn)程池中正在執(zhí)行任務(wù)的線(xiàn)程數(shù)量
線(xiàn)程池生命周期:

- running:能接受新提交的任務(wù),也能處理阻塞隊(duì)列中的任務(wù)
- shutdown:不能處理新的任務(wù),但是能繼續(xù)處理阻塞隊(duì)列中任務(wù)
- stop:不能接收新的任務(wù),也不處理隊(duì)列中的任務(wù)
- tidying:如果所有的任務(wù)都已經(jīng)終止了,這時(shí)有效線(xiàn)程數(shù)為0
- terminated:最終狀態(tài)
使用Executors創(chuàng)建線(xiàn)程池
使用Executors可以創(chuàng)建四種線(xiàn)程池:分別對(duì)應(yīng)上邊提到的四種線(xiàn)程池初始化方法
Executors.newCachedThreadPool
newCachedThreadPool是一個(gè)根據(jù)需要?jiǎng)?chuàng)建新線(xiàn)程的線(xiàn)程池,當(dāng)一個(gè)任務(wù)提交時(shí),corePoolSize為0不創(chuàng)建核心線(xiàn)程,SynchronousQueue是一個(gè)不存儲(chǔ)元素的隊(duì)列,可以理解為隊(duì)里永遠(yuǎn)是滿(mǎn)的,因此最終會(huì)創(chuàng)建非核心線(xiàn)程來(lái)執(zhí)行任務(wù)。 對(duì)于非核心線(xiàn)程空閑60s時(shí)將被回收。因?yàn)镮nteger.MAX_VALUE非常大,可以認(rèn)為是可以無(wú)限創(chuàng)建線(xiàn)程的,在資源有限的情況下容易引起OOM異常。
//創(chuàng)建newCachedThreadPool線(xiàn)程池源碼
public static ExecutorService newCachedThreadPool() {
/**
*corePoolSize: 0,核心線(xiàn)程池的數(shù)量為0
*maximumPoolSize: Integer.MAX_VALUE,可以認(rèn)為最大線(xiàn)程數(shù)是無(wú)限的
*keepAliveTime: 60L
*unit: 秒
*workQueue: SynchronousQueue
**/
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
使用案例:
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
executor.execute(new Runnable() {
@Override
public void run() {
log.info("task:{}",index);
}
});
}
}值得注意的一點(diǎn)是,newCachedThreadPool的返回值是ExecutorService類(lèi)型,該類(lèi)型只包含基礎(chǔ)的線(xiàn)程池方法,但卻不包含線(xiàn)程監(jiān)控相關(guān)方法,因此在使用返回值為ExecutorService的線(xiàn)程池類(lèi)型創(chuàng)建新線(xiàn)程時(shí)要考慮到具體情況。

Executors.newSingleThreadExecutor
newSingleThreadExecutor是單線(xiàn)程線(xiàn)程池,只有一個(gè)核心線(xiàn)程,用唯一的一個(gè)共用線(xiàn)程執(zhí)行任務(wù),保證所有任務(wù)按指定順序執(zhí)行(FIFO、優(yōu)先級(jí)…)
//newSingleThreadExecutor創(chuàng)建線(xiàn)程池源碼
public static ExecutorService newSingleThreadExecutor() {
/**
* corePoolSize : 1,核心線(xiàn)程池的數(shù)量為1
* maximumPoolSize : 1,只可以創(chuàng)建一個(gè)非核心線(xiàn)程
* keepAliveTime : 0L
* unit => 秒
* workQueue => LinkedBlockingQueue
**/
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}當(dāng)一個(gè)任務(wù)提交時(shí),首先會(huì)創(chuàng)建一個(gè)核心線(xiàn)程來(lái)執(zhí)行任務(wù),如果超過(guò)核心線(xiàn)程的數(shù)量,將會(huì)放入隊(duì)列中,因?yàn)長(zhǎng)inkedBlockingQueue是長(zhǎng)度為Integer.MAX_VALUE的隊(duì)列,可以認(rèn)為是無(wú)界隊(duì)列,因此往隊(duì)列中可以插入無(wú)限多的任務(wù),在資源有限的時(shí)候容易引起OOM異常,同時(shí)因?yàn)闊o(wú)界隊(duì)列,maximumPoolSize和keepAliveTime參數(shù)將無(wú)效,壓根就不會(huì)創(chuàng)建非核心線(xiàn)程。
Executors.newFixedThreadPool
定長(zhǎng)線(xiàn)程池,核心線(xiàn)程數(shù)和最大線(xiàn)程數(shù)由用戶(hù)傳入,可以設(shè)置線(xiàn)程的最大并發(fā)數(shù),超出在隊(duì)列等待
//newFixedThreadPool創(chuàng)建線(xiàn)程池源碼
public static ExecutorService newFixedThreadPool(int nThreads) {
/**
* corePoolSize : 核心線(xiàn)程的數(shù)量為自定義輸入nThreads
* maximumPoolSize : 最大線(xiàn)程的數(shù)量為自定義輸入nThreads
* keepAliveTime : 0L
* unit : 秒
* workQueue : LinkedBlockingQueue
**/
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}newFixedThreadPool和SingleThreadExecutor類(lèi)似,唯一的區(qū)別就是核心線(xiàn)程數(shù)不同,并且由于使用的是LinkedBlockingQueue,在資源有限的時(shí)候容易引起OOM異常。
Executors.newScheduledThreadPool
定長(zhǎng)線(xiàn)程池,核心線(xiàn)程數(shù)由用戶(hù)傳入,支持定時(shí)和周期任務(wù)執(zhí)行
//newScheduledThreadPool創(chuàng)建線(xiàn)程池源碼
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
/**
* corePoolSize : 核心線(xiàn)程的數(shù)量為自定義輸入corePoolSize
* maximumPoolSize : 最大線(xiàn)程的數(shù)量為Integer.MAX_VALUE
* keepAliveTime : 0L
* unit : 納秒
* workQueue : DelayedWorkQueue
**/
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}當(dāng)一個(gè)任務(wù)提交時(shí),corePoolSize為自定義輸入,首先創(chuàng)建核心線(xiàn)程,核心線(xiàn)程滿(mǎn)了之后,因此最終會(huì)創(chuàng)建非核心線(xiàn)程來(lái)執(zhí)行任務(wù)。非核心線(xiàn)程使用后將被回收。因?yàn)镮nteger.MAX_VALUE非常大,可以認(rèn)為是可以無(wú)限創(chuàng)建線(xiàn)程的,在資源有限的情況下容易引起OOM異常。因?yàn)槭褂玫腄elayedWorkQueue可以實(shí)現(xiàn)定時(shí)和周期任務(wù)。 ScheduledExecutorService提供了三種方法可以使用:

schedule:延遲后執(zhí)行任務(wù) scheduleAtFixedRate:以指定的速率執(zhí)行任務(wù) scheduleWithFixedDelay:以指定的延遲執(zhí)行任務(wù) 使用案例:
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// executorService.schedule(new Runnable() {
// @Override
// public void run() {
// log.warn("schedule run");
// }
// //延遲3秒后執(zhí)行
// }, 3, TimeUnit.SECONDS);
// executorService.shutdown();
// executorService.scheduleWithFixedDelay(new Runnable() {
// @Override
// public void run() {
// log.warn("scheduleWithFixedDelay run");
// }
// //延遲一秒后每隔3秒執(zhí)行
// }, 1, 3, TimeUnit.SECONDS);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
log.warn("schedule run");
}
//延遲一秒后每隔3秒執(zhí)行
}, 1, 3, TimeUnit.SECONDS);
/**
* 定時(shí)器調(diào)度,不推薦使用,推薦ScheduledExecutorService調(diào)度
*/
// Timer timer = new Timer();
// timer.schedule(new TimerTask() {
// @Override
// public void run() {
// log.warn("timer run");
// }
// //從當(dāng)前時(shí)間每隔5秒執(zhí)行
// }, new Date(), 5 * 1000);
}總結(jié)
- FixedThreadPool和SingleThreadExecutor 允許的請(qǐng)求隊(duì)列長(zhǎng)度為Integer.MAX_VALUE,可能會(huì)堆積大量的請(qǐng)求,從而引起OOM異常
- CachedThreadPool 和newScheduledThreadPool允許創(chuàng)建的線(xiàn)程數(shù)為Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線(xiàn)程,從而引起OOM異常
這就是為什么禁止使用Executors去創(chuàng)建線(xiàn)程池,而是推薦自己去創(chuàng)建ThreadPoolExecutor的原因
如何定義線(xiàn)程池參數(shù)
CPU密集型 : 線(xiàn)程池的大小推薦為CPU數(shù)量 + 1,CPU數(shù)量可以根據(jù)Runtime.availableProcessors方法獲取 IO密集型 : CPU數(shù)量 * CPU利用率 * (1 + 線(xiàn)程等待時(shí)間/線(xiàn)程CPU時(shí)間) 混合型 : 將任務(wù)分為CPU密集型和IO密集型,然后分別使用不同的線(xiàn)程池去處理,從而使每個(gè)線(xiàn)程池可以根據(jù)各自的工作負(fù)載來(lái)調(diào)整 阻塞隊(duì)列 : 推薦使用有界隊(duì)列,有界隊(duì)列有助于避免資源耗盡的情況發(fā)生 拒絕策略 : 默認(rèn)采用的是AbortPolicy拒絕策略,直接在程序中拋出RejectedExecutionException異常【因?yàn)槭沁\(yùn)行時(shí)異常,不強(qiáng)制catch】,這種處理方式不夠優(yōu)雅。處理拒絕策略有以下幾種比較推薦:
- 在程序中捕獲RejectedExecutionException異常,在捕獲異常中對(duì)任務(wù)進(jìn)行處理。針對(duì)默認(rèn)拒絕策略
- 使用CallerRunsPolicy拒絕策略,該策略會(huì)將任務(wù)交給調(diào)用execute的線(xiàn)程執(zhí)行【一般為主線(xiàn)程】,此時(shí)主線(xiàn)程將在一段時(shí)間內(nèi)不能提交任何任務(wù),從而使工作線(xiàn)程處理正在執(zhí)行的任務(wù)。此時(shí)提交的線(xiàn)程將被保存在TCP隊(duì)列中,TCP隊(duì)列滿(mǎn)將會(huì)影響客戶(hù)端,這是一種平緩的性能降低
- 自定義拒絕策略,只需要實(shí)現(xiàn)RejectedExecutionHandler接口即可
- 如果任務(wù)不是特別重要,使用DiscardPolicy和DiscardOldestPolicy拒絕策略將任務(wù)丟棄也是可以的
如果使用Executors的靜態(tài)方法創(chuàng)建ThreadPoolExecutor對(duì)象,可以通過(guò)使用Semaphore對(duì)任務(wù)的執(zhí)行進(jìn)行限流也可以避免出現(xiàn)OOM異常
到此這篇關(guān)于Java線(xiàn)程池Executor用法詳解的文章就介紹到這了,更多相關(guān)Java線(xiàn)程池Executor內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
IDEA創(chuàng)建maven項(xiàng)目時(shí)在tomcat運(yùn)行瀏覽器404的問(wèn)題
這篇文章主要介紹了IDEA創(chuàng)建maven項(xiàng)目時(shí)在tomcat運(yùn)行瀏覽器404的問(wèn)題及解決方法,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-11-11
Java反轉(zhuǎn)字符串和相關(guān)字符編碼的問(wèn)題解決
反轉(zhuǎn)字符串一直被當(dāng)作是簡(jiǎn)單問(wèn)題,大家的思想主要就是利用遍歷,首尾交換字符實(shí)現(xiàn)字符串的反轉(zhuǎn)。例如下面的代碼,就可以簡(jiǎn)單實(shí)現(xiàn)反轉(zhuǎn)。2013-05-05
解決java啟動(dòng)時(shí)報(bào)線(xiàn)程占用報(bào)錯(cuò):Exception?in?thread?“Thread-14“?java.ne
這篇文章主要給大家介紹了關(guān)于解決java啟動(dòng)時(shí)報(bào)線(xiàn)程占用:Exception?in?thread?“Thread-14“?java.net.BindException:?Address?already?in?use:?bind的相關(guān)資料,文中將解決的辦法介紹的非常詳細(xì),需要的朋友可以參考下2023-04-04
idea中解決maven包沖突的問(wèn)題(maven helper)
這篇文章主要介紹了idea中解決maven包沖突的問(wèn)題(maven helper),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-12-12
Spring?Cloud?Alibaba?Nacos兩種檢查機(jī)制
這篇文章主要介紹了Spring?Cloud?Alibaba?Nacos兩種檢查機(jī)制,作為注冊(cè)中心不止提供了服務(wù)注冊(cè)和服務(wù)發(fā)現(xiàn)功能,它還提供了服務(wù)可用性監(jiān)測(cè)的機(jī)制,下面我們就一起進(jìn)入文章了解具體詳情吧2022-05-05

