Java線程池Executor用法詳解
線程池類圖
我們最常使用的Executors實(shí)現(xiàn)創(chuàng)建線程池使用線程主要是用上述類圖中提供的類。在上邊的類圖中,包含了一個(gè)Executor框架,它是一個(gè)根據(jù)一組執(zhí)行策略的調(diào)用調(diào)度執(zhí)行和控制異步任務(wù)的框架,目的是提供一種將任務(wù)提交與任務(wù)如何運(yùn)行分離開的機(jī)制。它包含了三個(gè)executor接口:
- Executor:運(yùn)行新任務(wù)的簡單接口
- ExecutorService:擴(kuò)展了Executor,添加了用來管理執(zhí)行器生命周期和任務(wù)生命周期的方法
- ScheduleExcutorService:擴(kuò)展了ExecutorService,支持Future和定期執(zhí)行任務(wù)
線程池的好處
- 降低資源消耗-重用存在的線程,減少對象創(chuàng)建、消亡的開銷,性能好
- 提高響應(yīng)速度 -可有效控制最大并發(fā)線程數(shù),提高系統(tǒng)資源利用率,同時(shí)可以避免過多資源競爭,避免阻塞。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可不用等待線程創(chuàng)建就能立即執(zhí)行
- 提高線程的可管理性-提供定時(shí)執(zhí)行、定期執(zhí)行、單線程、并發(fā)數(shù)控制等功能。
new Thread的弊端
- 每次new Thread 新建對象,性能差
- 線程缺乏統(tǒng)一管理,可能無限制的新建線程,相互競爭,可能占用過多的系統(tǒng)資源導(dǎo)致死機(jī)或者OOM(out of memory 內(nèi)存溢出),這種問題的原因不是因?yàn)閱渭兊膎ew一個(gè)Thread,而是可能因?yàn)槌绦虻腷ug或者設(shè)計(jì)上的缺陷導(dǎo)致不斷new Thread造成的。
- 缺少更多功能,如更多執(zhí)行、定期執(zhí)行、線程中斷。
線程池核心類-ThreadPoolExecutor
參數(shù)說明:ThreadPoolExecutor一共有七個(gè)參數(shù),這七個(gè)參數(shù)配合起來,構(gòu)成了線程池強(qiáng)大的功能。
corePoolSize:核心線程數(shù)量
maximumPoolSize:線程最大線程數(shù)
workQueue:阻塞隊(duì)列,存儲(chǔ)等待執(zhí)行的任務(wù),很重要,會(huì)對線程池運(yùn)行過程產(chǎn)生重大影響
當(dāng)我們提交一個(gè)新的任務(wù)到線程池,線程池會(huì)根據(jù)當(dāng)前池中正在運(yùn)行的線程數(shù)量來決定該任務(wù)的處理方式。處理方式有三種:
1、直接切換(SynchronusQueue)
2、無界隊(duì)列(LinkedBlockingQueue)能夠創(chuàng)建的最大線程數(shù)為corePoolSize,這時(shí)maximumPoolSize就不會(huì)起作用了。當(dāng)線程池中所有的核心線程都是運(yùn)行狀態(tài)的時(shí)候,新的任務(wù)提交就會(huì)放入等待隊(duì)列中。
3、有界隊(duì)列(ArrayBlockingQueue)最大maximumPoolSize,能夠降低資源消耗,但是這種方式使得線程池對線程調(diào)度變的更困難。因?yàn)榫€程池與隊(duì)列容量都是有限的。所以想讓線程池的吞吐率和處理任務(wù)達(dá)到一個(gè)合理的范圍,又想使我們的線程調(diào)度相對簡單,并且還盡可能降低資源的消耗,我們就需要合理的限制這兩個(gè)數(shù)量 分配技巧: [如果想降低資源的消耗包括降低cpu使用率、操作系統(tǒng)資源的消耗、上下文切換的開銷等等,可以設(shè)置一個(gè)較大的隊(duì)列容量和較小的線程池容量,這樣會(huì)降低線程池的吞吐量。如果我們提交的任務(wù)經(jīng)常發(fā)生阻塞,我們可以調(diào)整maximumPoolSize。如果我們的隊(duì)列容量較小,我們需要把線程池大小設(shè)置的大一些,這樣cpu的使用率相對來說會(huì)高一些。但是如果線程池的容量設(shè)置的過大,提高任務(wù)的數(shù)量過多的時(shí)候,并發(fā)量會(huì)增加,那么線程之間的調(diào)度就是一個(gè)需要考慮的問題。這樣反而可能會(huì)降低處理任務(wù)的吞吐量。]
keepAliveTime:線程沒有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間終止(當(dāng)線程中的線程數(shù)量大于corePoolSize的時(shí)候,如果這時(shí)沒有新的任務(wù)提交核心線程外的線程不會(huì)立即銷毀,而是等待,直到超過keepAliveTime)
unit:keepAliveTime的時(shí)間單位
threadFactory:線程工廠,用來創(chuàng)建線程,有一個(gè)默認(rèn)的工場來創(chuàng)建線程,這樣新創(chuàng)建出來的線程有相同的優(yōu)先級,是非守護(hù)線程、設(shè)置好了名稱)
rejectHandler:當(dāng)拒絕處理任務(wù)時(shí)(阻塞隊(duì)列滿)的策略(AbortPolicy默認(rèn)策略直接拋出異常、CallerRunsPolicy用調(diào)用者所在的線程執(zhí)行任務(wù)、DiscardOldestPolicy丟棄隊(duì)列中最靠前的任務(wù)并執(zhí)行當(dāng)前任務(wù)、DiscardPolicy直接丟棄當(dāng)前任務(wù))
corePoolSize、maximumPoolSize、workQueue 三者關(guān)系:如果運(yùn)行的線程數(shù)小于corePoolSize的時(shí)候,直接創(chuàng)建新線程來處理任務(wù)。即使線程池中的其他線程是空閑的。如果運(yùn)行中的線程數(shù)大于corePoolSize且小于maximumPoolSize時(shí),那么只有當(dāng)workQueue滿的時(shí)候才創(chuàng)建新的線程去處理任務(wù)。如果corePoolSize與maximumPoolSize是相同的,那么創(chuàng)建的線程池大小是固定的。這時(shí)有新任務(wù)提交,當(dāng)workQueue未滿時(shí),就把請求放入workQueue中。等待空線程從workQueue取出任務(wù)。如果workQueue此時(shí)也滿了,那么就使用另外的拒絕策略參數(shù)去執(zhí)行拒絕策略。
初始化方法:由七個(gè)參數(shù)組合成四個(gè)初始化方法
其他方法:
execute(); //提交任務(wù),交給線程池執(zhí)行 submit();//提交任務(wù),能夠返回執(zhí)行結(jié)果 execute+Future shutdown();//關(guān)閉線程池,等待任務(wù)都執(zhí)行完 shutdownNow();//關(guān)閉線程池,不等待任務(wù)執(zhí)行完 getTaskCount();//線程池已執(zhí)行和未執(zhí)行的任務(wù)總數(shù) getCompleteTaskCount();//已完成的任務(wù)數(shù)量 getPoolSize();//線程池當(dāng)前的線程數(shù)量 getActiveCount();//當(dāng)前線程池中正在執(zhí)行任務(wù)的線程數(shù)量
線程池生命周期:
- running:能接受新提交的任務(wù),也能處理阻塞隊(duì)列中的任務(wù)
- shutdown:不能處理新的任務(wù),但是能繼續(xù)處理阻塞隊(duì)列中任務(wù)
- stop:不能接收新的任務(wù),也不處理隊(duì)列中的任務(wù)
- tidying:如果所有的任務(wù)都已經(jīng)終止了,這時(shí)有效線程數(shù)為0
- terminated:最終狀態(tài)
使用Executors創(chuàng)建線程池
使用Executors可以創(chuàng)建四種線程池:分別對應(yīng)上邊提到的四種線程池初始化方法
Executors.newCachedThreadPool
newCachedThreadPool是一個(gè)根據(jù)需要?jiǎng)?chuàng)建新線程的線程池,當(dāng)一個(gè)任務(wù)提交時(shí),corePoolSize為0不創(chuàng)建核心線程,SynchronousQueue是一個(gè)不存儲(chǔ)元素的隊(duì)列,可以理解為隊(duì)里永遠(yuǎn)是滿的,因此最終會(huì)創(chuàng)建非核心線程來執(zhí)行任務(wù)。 對于非核心線程空閑60s時(shí)將被回收。因?yàn)镮nteger.MAX_VALUE非常大,可以認(rèn)為是可以無限創(chuàng)建線程的,在資源有限的情況下容易引起OOM異常。
//創(chuàng)建newCachedThreadPool線程池源碼 public static ExecutorService newCachedThreadPool() { /** *corePoolSize: 0,核心線程池的數(shù)量為0 *maximumPoolSize: Integer.MAX_VALUE,可以認(rèn)為最大線程數(shù)是無限的 *keepAliveTime: 60L *unit: 秒 *workQueue: SynchronousQueue **/ return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
使用案例:
public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int index = i; executor.execute(new Runnable() { @Override public void run() { log.info("task:{}",index); } }); } }
值得注意的一點(diǎn)是,newCachedThreadPool的返回值是ExecutorService類型,該類型只包含基礎(chǔ)的線程池方法,但卻不包含線程監(jiān)控相關(guān)方法,因此在使用返回值為ExecutorService的線程池類型創(chuàng)建新線程時(shí)要考慮到具體情況。
Executors.newSingleThreadExecutor
newSingleThreadExecutor是單線程線程池,只有一個(gè)核心線程,用唯一的一個(gè)共用線程執(zhí)行任務(wù),保證所有任務(wù)按指定順序執(zhí)行(FIFO、優(yōu)先級…)
//newSingleThreadExecutor創(chuàng)建線程池源碼 public static ExecutorService newSingleThreadExecutor() { /** * corePoolSize : 1,核心線程池的數(shù)量為1 * maximumPoolSize : 1,只可以創(chuàng)建一個(gè)非核心線程 * keepAliveTime : 0L * unit => 秒 * workQueue => LinkedBlockingQueue **/ return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
當(dāng)一個(gè)任務(wù)提交時(shí),首先會(huì)創(chuàng)建一個(gè)核心線程來執(zhí)行任務(wù),如果超過核心線程的數(shù)量,將會(huì)放入隊(duì)列中,因?yàn)長inkedBlockingQueue是長度為Integer.MAX_VALUE的隊(duì)列,可以認(rèn)為是無界隊(duì)列,因此往隊(duì)列中可以插入無限多的任務(wù),在資源有限的時(shí)候容易引起OOM異常,同時(shí)因?yàn)闊o界隊(duì)列,maximumPoolSize和keepAliveTime參數(shù)將無效,壓根就不會(huì)創(chuàng)建非核心線程。
Executors.newFixedThreadPool
定長線程池,核心線程數(shù)和最大線程數(shù)由用戶傳入,可以設(shè)置線程的最大并發(fā)數(shù),超出在隊(duì)列等待
//newFixedThreadPool創(chuàng)建線程池源碼 public static ExecutorService newFixedThreadPool(int nThreads) { /** * corePoolSize : 核心線程的數(shù)量為自定義輸入nThreads * maximumPoolSize : 最大線程的數(shù)量為自定義輸入nThreads * keepAliveTime : 0L * unit : 秒 * workQueue : LinkedBlockingQueue **/ return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newFixedThreadPool和SingleThreadExecutor類似,唯一的區(qū)別就是核心線程數(shù)不同,并且由于使用的是LinkedBlockingQueue,在資源有限的時(shí)候容易引起OOM異常。
Executors.newScheduledThreadPool
定長線程池,核心線程數(shù)由用戶傳入,支持定時(shí)和周期任務(wù)執(zhí)行
//newScheduledThreadPool創(chuàng)建線程池源碼 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { /** * corePoolSize : 核心線程的數(shù)量為自定義輸入corePoolSize * maximumPoolSize : 最大線程的數(shù)量為Integer.MAX_VALUE * keepAliveTime : 0L * unit : 納秒 * workQueue : DelayedWorkQueue **/ super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
當(dāng)一個(gè)任務(wù)提交時(shí),corePoolSize為自定義輸入,首先創(chuàng)建核心線程,核心線程滿了之后,因此最終會(huì)創(chuàng)建非核心線程來執(zhí)行任務(wù)。非核心線程使用后將被回收。因?yàn)镮nteger.MAX_VALUE非常大,可以認(rèn)為是可以無限創(chuàng)建線程的,在資源有限的情況下容易引起OOM異常。因?yàn)槭褂玫腄elayedWorkQueue可以實(shí)現(xiàn)定時(shí)和周期任務(wù)。 ScheduledExecutorService提供了三種方法可以使用:
schedule:延遲后執(zhí)行任務(wù) scheduleAtFixedRate:以指定的速率執(zhí)行任務(wù) scheduleWithFixedDelay:以指定的延遲執(zhí)行任務(wù) 使用案例:
public static void main(String[] args) { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); // executorService.schedule(new Runnable() { // @Override // public void run() { // log.warn("schedule run"); // } // //延遲3秒后執(zhí)行 // }, 3, TimeUnit.SECONDS); // executorService.shutdown(); // executorService.scheduleWithFixedDelay(new Runnable() { // @Override // public void run() { // log.warn("scheduleWithFixedDelay run"); // } // //延遲一秒后每隔3秒執(zhí)行 // }, 1, 3, TimeUnit.SECONDS); executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { log.warn("schedule run"); } //延遲一秒后每隔3秒執(zhí)行 }, 1, 3, TimeUnit.SECONDS); /** * 定時(shí)器調(diào)度,不推薦使用,推薦ScheduledExecutorService調(diào)度 */ // Timer timer = new Timer(); // timer.schedule(new TimerTask() { // @Override // public void run() { // log.warn("timer run"); // } // //從當(dāng)前時(shí)間每隔5秒執(zhí)行 // }, new Date(), 5 * 1000); }
總結(jié)
- FixedThreadPool和SingleThreadExecutor 允許的請求隊(duì)列長度為Integer.MAX_VALUE,可能會(huì)堆積大量的請求,從而引起OOM異常
- CachedThreadPool 和newScheduledThreadPool允許創(chuàng)建的線程數(shù)為Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線程,從而引起OOM異常
這就是為什么禁止使用Executors去創(chuàng)建線程池,而是推薦自己去創(chuàng)建ThreadPoolExecutor的原因
如何定義線程池參數(shù)
CPU密集型 : 線程池的大小推薦為CPU數(shù)量 + 1,CPU數(shù)量可以根據(jù)Runtime.availableProcessors方法獲取 IO密集型 : CPU數(shù)量 * CPU利用率 * (1 + 線程等待時(shí)間/線程CPU時(shí)間) 混合型 : 將任務(wù)分為CPU密集型和IO密集型,然后分別使用不同的線程池去處理,從而使每個(gè)線程池可以根據(jù)各自的工作負(fù)載來調(diào)整 阻塞隊(duì)列 : 推薦使用有界隊(duì)列,有界隊(duì)列有助于避免資源耗盡的情況發(fā)生 拒絕策略 : 默認(rèn)采用的是AbortPolicy拒絕策略,直接在程序中拋出RejectedExecutionException異?!疽?yàn)槭沁\(yùn)行時(shí)異常,不強(qiáng)制catch】,這種處理方式不夠優(yōu)雅。處理拒絕策略有以下幾種比較推薦:
- 在程序中捕獲RejectedExecutionException異常,在捕獲異常中對任務(wù)進(jìn)行處理。針對默認(rèn)拒絕策略
- 使用CallerRunsPolicy拒絕策略,該策略會(huì)將任務(wù)交給調(diào)用execute的線程執(zhí)行【一般為主線程】,此時(shí)主線程將在一段時(shí)間內(nèi)不能提交任何任務(wù),從而使工作線程處理正在執(zhí)行的任務(wù)。此時(shí)提交的線程將被保存在TCP隊(duì)列中,TCP隊(duì)列滿將會(huì)影響客戶端,這是一種平緩的性能降低
- 自定義拒絕策略,只需要實(shí)現(xiàn)RejectedExecutionHandler接口即可
- 如果任務(wù)不是特別重要,使用DiscardPolicy和DiscardOldestPolicy拒絕策略將任務(wù)丟棄也是可以的
如果使用Executors的靜態(tài)方法創(chuàng)建ThreadPoolExecutor對象,可以通過使用Semaphore對任務(wù)的執(zhí)行進(jìn)行限流也可以避免出現(xiàn)OOM異常
到此這篇關(guān)于Java線程池Executor用法詳解的文章就介紹到這了,更多相關(guān)Java線程池Executor內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
IDEA創(chuàng)建maven項(xiàng)目時(shí)在tomcat運(yùn)行瀏覽器404的問題
這篇文章主要介紹了IDEA創(chuàng)建maven項(xiàng)目時(shí)在tomcat運(yùn)行瀏覽器404的問題及解決方法,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-11-11Java反轉(zhuǎn)字符串和相關(guān)字符編碼的問題解決
反轉(zhuǎn)字符串一直被當(dāng)作是簡單問題,大家的思想主要就是利用遍歷,首尾交換字符實(shí)現(xiàn)字符串的反轉(zhuǎn)。例如下面的代碼,就可以簡單實(shí)現(xiàn)反轉(zhuǎn)。2013-05-05解決java啟動(dòng)時(shí)報(bào)線程占用報(bào)錯(cuò):Exception?in?thread?“Thread-14“?java.ne
這篇文章主要給大家介紹了關(guān)于解決java啟動(dòng)時(shí)報(bào)線程占用:Exception?in?thread?“Thread-14“?java.net.BindException:?Address?already?in?use:?bind的相關(guān)資料,文中將解決的辦法介紹的非常詳細(xì),需要的朋友可以參考下2023-04-04idea中解決maven包沖突的問題(maven helper)
這篇文章主要介紹了idea中解決maven包沖突的問題(maven helper),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-12-12Spring?Cloud?Alibaba?Nacos兩種檢查機(jī)制
這篇文章主要介紹了Spring?Cloud?Alibaba?Nacos兩種檢查機(jī)制,作為注冊中心不止提供了服務(wù)注冊和服務(wù)發(fā)現(xiàn)功能,它還提供了服務(wù)可用性監(jiān)測的機(jī)制,下面我們就一起進(jìn)入文章了解具體詳情吧2022-05-05