基于ThreadPoolTaskExecutor的使用說明
ThreadPoolTaskExecutor的使用
當我們需要實現(xiàn)并發(fā)、異步等操作時,通常都會使用到ThreadPoolTaskExecutor,現(xiàn)對其使用稍作總結(jié)。
springboot 配置
提交任務(wù)
- 無返回值的任務(wù)使用execute(Runnable)
- 有返回值的任務(wù)使用submit(Runnable)
處理流程
當一個任務(wù)被提交到線程池時,首先查看線程池的核心線程是否都在執(zhí)行任務(wù),否就選擇一條線程執(zhí)行任務(wù),是就執(zhí)行第二步。
查看核心線程池是否已滿,不滿就創(chuàng)建一條線程執(zhí)行任務(wù),否則執(zhí)行第三步。
查看任務(wù)隊列是否已滿,不滿就將任務(wù)存儲在任務(wù)隊列中,否則執(zhí)行第四步。
查看線程池是否已滿,不滿就創(chuàng)建一條線程執(zhí)行任務(wù),否則就按照策略處理無法執(zhí)行的任務(wù)。
在ThreadPoolExecutor中表現(xiàn)為:
如果當前運行的線程數(shù)小于corePoolSize,那么就創(chuàng)建線程來執(zhí)行任務(wù)(執(zhí)行時需要獲取全局鎖)。
如果運行的線程大于或等于corePoolSize,那么就把task加入BlockQueue。
如果創(chuàng)建的線程數(shù)量大于BlockQueue的最大容量,那么創(chuàng)建新線程來執(zhí)行該任務(wù)。
如果創(chuàng)建線程導(dǎo)致當前運行的線程數(shù)超過maximumPoolSize,就根據(jù)飽和策略來拒絕該任務(wù)。
關(guān)閉線程池
調(diào)用shutdown或者shutdownNow,兩者都不會接受新的任務(wù),而且通過調(diào)用要停止線程的interrupt方法來中斷線程,有可能線程永遠不會被中斷,不同之處在于shutdownNow會首先將線程池的狀態(tài)設(shè)置為STOP,然后嘗試停止所有線程(有可能導(dǎo)致部分任務(wù)沒有執(zhí)行完)然后返回未執(zhí)行任務(wù)的列表。而shutdown則只是將線程池的狀態(tài)設(shè)置為shutdown,然后中斷所有沒有執(zhí)行任務(wù)的線程,并將剩余的任務(wù)執(zhí)行完。
配置線程個數(shù)
如果是CPU密集型任務(wù),那么線程池的線程個數(shù)應(yīng)該盡量少一些,一般為CPU的個數(shù)+1條線程。
如果是IO密集型任務(wù),那么線程池的線程可以放的很大,如2*CPU的個數(shù)。
對于混合型任務(wù),如果可以拆分的話,通過拆分成CPU密集型和IO密集型兩種來提高執(zhí)行效率;如果不能拆分的的話就可以根據(jù)實際情況來調(diào)整線程池中線程的個數(shù)。
監(jiān)控線程池狀態(tài)
常用狀態(tài)
taskCount
:線程需要執(zhí)行的任務(wù)個數(shù)。
completedTaskCount
:線程池在運行過程中已完成的任務(wù)數(shù)。
largestPoolSize
:線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量。
getPoolSize
:獲取當前線程池的線程數(shù)量。
getActiveCount
:獲取活動的線程的數(shù)量
通過繼承線程池,重寫beforeExecute,afterExecute和terminated方法來在線程執(zhí)行任務(wù)前,線程執(zhí)行任務(wù)結(jié)束,和線程終結(jié)前獲取線程的運行情況,根據(jù)具體情況調(diào)整線程池的線程數(shù)量。
ThreadPoolTaskExecutor配置問題
最近線上出現(xiàn)一個奇葩問題,使用的是ThreadPoolTaskExecutor來處理后續(xù)服務(wù)調(diào)用,剛開始運行ThreadPoolTaskExecutor處理后續(xù)服務(wù)調(diào)用是沒有問題的,但是一段時間之后,發(fā)現(xiàn)后續(xù)服務(wù)一直沒有被調(diào)用,導(dǎo)致了極其嚴重的后果
有關(guān)spring中ThreadPoolTaskExecutor具體如下
<bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 核心線程數(shù),默認為1 --> <property name="corePoolSize" value="5" /> <!-- 最大線程數(shù),默認為Integer.MAX_VALUE --> <property name="maxPoolSize" value="16" /> <!-- 隊列最大長度,一般需要設(shè)置值>=notifyScheduledMainExecutor.maxNum;默認為Integer.MAX_VALUE --> <!--<property name="queueCapacity" value="10" />--> <!-- 線程池維護線程所允許的空閑時間,默認為60s --> <property name="keepAliveSeconds" value="300" /> <!-- 線程池對拒絕任務(wù)(無線程可用)的處理策略, 目前只支持AbortPolicy、CallerRunsPolicy;默認為后者 --> <property name="rejectedExecutionHandler"> <!-- AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 --> <!-- CallerRunsPolicy: 主線程直接執(zhí)行該任務(wù),執(zhí)行完之后嘗試添加下一個任務(wù)到線程池中, --> <!-- DiscardOldestPolicy: 拋棄舊的任務(wù)、暫不支持;會導(dǎo)致被丟棄的任務(wù)無法再次被執(zhí)行 --> <!-- DiscardPolicy: 拋棄當前任務(wù)、暫不支持;會導(dǎo)致被丟棄的任務(wù)無法再次被執(zhí)行 --> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> </property> </bean>
那就不得不了解一下java.util.concurrent包下Executor構(gòu)架了
回憶一下線程池工作原理
如果當前運行的線程少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)(需要獲得全局鎖)
如果運行的線程等于或多于corePoolSize ,則將任務(wù)加入BlockingQueue
如果無法將任務(wù)加入BlockingQueue(隊列已滿),則創(chuàng)建新的線程來處理任務(wù)(需要獲得全局鎖)
如果創(chuàng)建新線程將使當前運行的線程超出maxiumPoolSize,任務(wù)將被拒絕,并調(diào)用
RejectedExecutionHandler.rejectedExecution()方法
測試場景1
首先,注釋queueCapacity的一行
任務(wù):
public class CustomRunnable implements Runnable { private int id; public CustomRunnable(int id) { this.id = id; } @Override public void run() { try { System.out.println("begin execute "+ Thread.currentThread().getName() + "-- task id: "+ id); String rs = ClientUtil.get("http://www.****.com"); System.out.println("end execute task: "+ id); } catch (Exception e) { e.printStackTrace(); } } }
測試案例:
@Test public void threadTest() throws InterruptedException { for (int i=0; i< 35; i++){ Thread t= new Thread(new CustomRunnable(i)); executor.execute(t); } Thread.sleep(1800000); }
測試結(jié)果:
七月 09, 2018 5:46:47 下午 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor initialize
信息: Initializing ExecutorService 'threadPoolTaskExecutor'
begin execute threadPoolTaskExecutor-1-- task id: 0
begin execute threadPoolTaskExecutor-2-- task id: 1
begin execute threadPoolTaskExecutor-3-- task id: 2
begin execute threadPoolTaskExecutor-4-- task id: 3
begin execute threadPoolTaskExecutor-5-- task id: 4
end execute task: 4
begin execute threadPoolTaskExecutor-5-- task id: 5
end execute task: 1
begin execute threadPoolTaskExecutor-2-- task id: 6
end execute task: 0
begin execute threadPoolTaskExecutor-1-- task id: 7
end execute task: 2
begin execute threadPoolTaskExecutor-3-- task id: 8
end execute task: 3
begin execute threadPoolTaskExecutor-4-- task id: 9
...
可以發(fā)現(xiàn),一開始線程池就創(chuàng)建了corePoolSize大小的線程,對于之后的新加進的任務(wù),就放到BlockingQueue中,默認是使用LinkedBlockingQueue,大小是Integer.MAX_VALUE,因為隊列大小太大,所以就不會創(chuàng)建maxPoolSize大小的線程數(shù)量,因此,只有線程處理完當前任務(wù),才會去處理下一個任務(wù),所以,剛加進去的任務(wù)得不到立即處理
測試場景2
只需要打開queueCapacity的一行,其他不變
測試結(jié)果:
七月 09, 2018 6:07:13 下午 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor initialize
信息: Initializing ExecutorService 'threadPoolTaskExecutor'
begin execute threadPoolTaskExecutor-1-- task id: 0
begin execute threadPoolTaskExecutor-2-- task id: 1
begin execute threadPoolTaskExecutor-3-- task id: 2
begin execute threadPoolTaskExecutor-4-- task id: 3
begin execute threadPoolTaskExecutor-5-- task id: 4
begin execute threadPoolTaskExecutor-6-- task id: 15
begin execute threadPoolTaskExecutor-7-- task id: 16
begin execute threadPoolTaskExecutor-8-- task id: 17
begin execute threadPoolTaskExecutor-9-- task id: 18
begin execute threadPoolTaskExecutor-10-- task id: 19
begin execute threadPoolTaskExecutor-11-- task id: 20
begin execute threadPoolTaskExecutor-12-- task id: 21
begin execute threadPoolTaskExecutor-14-- task id: 23
begin execute threadPoolTaskExecutor-15-- task id: 24
begin execute main-- task id: 26
begin execute threadPoolTaskExecutor-13-- task id: 22
begin execute threadPoolTaskExecutor-16-- task id: 25
begin execute threadPoolTaskExecutor-11-- task id: 5
end execute task: 15
begin execute threadPoolTaskExecutor-6-- task id: 6
end execute task: 23
begin execute threadPoolTaskExecutor-14-- task id: 7
end execute task: 4
begin execute threadPoolTaskExecutor-5-- task id: 8
end execute task: 17
begin execute threadPoolTaskExecutor-8-- task id: 9
....
可以發(fā)現(xiàn),因為初始任務(wù)數(shù)量大于corePoolSize大小,所以線程池初始化就創(chuàng)建了maxPoolSize大小數(shù)量的純種,對于后續(xù)新加進的任務(wù)會入到BlockingQueue隊列中去,之后等待線程處理完一個任務(wù)之后再處理隊列中的任務(wù)
猜想
線上出現(xiàn)這種原因可能就是因為queueCapacity被設(shè)置成了默認(Integer.MAX_VALUE),而且初始化純種的corePoolSize數(shù)量過少,并且線程處理速度較慢(業(yè)務(wù)邏輯,網(wǎng)絡(luò)請求等等原因),導(dǎo)致后續(xù)任務(wù)會一直填加到隊列中去,遲遲得不到立即處理。
解決方案
手動設(shè)置queueCapacity大小,網(wǎng)絡(luò)請求原因的話,可以設(shè)置超時時間;業(yè)務(wù)邏輯的話,另辟蹊徑。。。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
java實現(xiàn)用戶簽到BitMap功能實現(xiàn)demo
這篇文章主要為大家介紹了java實現(xiàn)用戶簽到BitMap功能實現(xiàn)demo,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-11-11Java形參和實參的實例之Integer類型與Int類型用法說明
這篇文章主要介紹了Java形參和實參的實例之Integer類型與Int類型用法說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-10-10