Spring線程池ThreadPoolTaskExecutor的用法及說(shuō)明
1 線程池簡(jiǎn)介
1.1 為什么使用線程池
- 降低系統(tǒng)資源消耗,通過(guò)重用已存在的線程,降低線程創(chuàng)建和銷毀造成的消耗;
- 提高系統(tǒng)響應(yīng)速度,當(dāng)有任務(wù)到達(dá)時(shí),通過(guò)復(fù)用已存在的線程,無(wú)需等待新線程的創(chuàng)建便能立即執(zhí)行;
- 方便線程并發(fā)數(shù)的管控,因?yàn)榫€程若是無(wú)限制的創(chuàng)建,可能會(huì)導(dǎo)致內(nèi)存占用過(guò)多而產(chǎn)生OOM,并且會(huì)造成cpu過(guò)度切換(cpu切換線程是有時(shí)間成本的(需要保持當(dāng)前執(zhí)行線程的現(xiàn)場(chǎng),并恢復(fù)要執(zhí)行線程的現(xiàn)場(chǎng))
- 提供更強(qiáng)大的功能,延時(shí)定時(shí)線程池
1.2 線程池為什么需要使用隊(duì)列
- 因?yàn)榫€程若是無(wú)限制的創(chuàng)建,可能會(huì)導(dǎo)致內(nèi)存占用過(guò)多而產(chǎn)生OOM,并且會(huì)造成cpu過(guò)度切換。
- 創(chuàng)建線程池的消耗較高或者線程池創(chuàng)建線程需要獲取mainlock這個(gè)全局鎖,影響并發(fā)效率,阻塞隊(duì)列可以很好的緩沖
1.3 線程池為什么要使用阻塞隊(duì)列而不使用非阻塞隊(duì)列
- 阻塞隊(duì)列可以保證任務(wù)隊(duì)列中沒(méi)有任務(wù)時(shí)阻塞獲取任務(wù)的線程,使得線程進(jìn)入wait狀態(tài),釋放cpu資源,當(dāng)隊(duì)列中有任務(wù)時(shí)才喚醒對(duì)應(yīng)線程從隊(duì)列中取出消息進(jìn)行執(zhí)行。
- 使得在線程不至于一直占用cpu資源。(線程執(zhí)行完任務(wù)后通過(guò)循環(huán)再次從任務(wù)隊(duì)列中取出任務(wù)進(jìn)行執(zhí)行,代碼片段如:while (task != null || (task = getTask()) != null) {})。
- 不用阻塞隊(duì)列也是可以的,不過(guò)實(shí)現(xiàn)起來(lái)比較麻煩而已,有好用的為啥不用呢
1.4 如何配置線程池
CPU密集型任務(wù)
- 盡量使用較小的線程池,一般為CPU核心數(shù)+1。
- 因?yàn)镃PU密集型任務(wù)使得CPU使用率很高,若開(kāi)過(guò)多的線程數(shù),會(huì)造成CPU過(guò)度切換
IO密集型任務(wù)
- 可以使用稍大的線程池,一般為2*CPU核心數(shù)。
- IO密集型任務(wù)CPU使用率并不高,因此可以讓CPU在等待IO的時(shí)候有其他線程去處理別的任務(wù),充分利用CPU時(shí)間
混合型任務(wù)
- 可以將任務(wù)分成IO密集型和CPU密集型任務(wù),然后分別用不同的線程池去處理。 只要分完之后兩個(gè)任務(wù)的執(zhí)行時(shí)間相差不大,那么就會(huì)比串行執(zhí)行來(lái)的高效
- 因?yàn)槿绻麆澐种髢蓚€(gè)任務(wù)執(zhí)行時(shí)間有數(shù)據(jù)級(jí)的差距,那么拆分沒(méi)有意義。
- 因?yàn)橄葓?zhí)行完的任務(wù)就要等后執(zhí)行完的任務(wù),最終的時(shí)間仍然取決于后執(zhí)行完的任務(wù),而且還要加上任務(wù)拆分與合并的開(kāi)銷,得不償失
1.5 execute()和submit()方法
1.execute(),執(zhí)行一個(gè)任務(wù),沒(méi)有返回值
2.submit(),提交一個(gè)線程任務(wù),有返回值
submit(Callable<T> task)
能獲取到它的返回值,通過(guò)future.get()獲取(阻塞直到任務(wù)執(zhí)行完)。一般使用FutureTask+Callable配合使用submit(Runnable task, T result)
能通過(guò)傳入的載體result間接獲得線程的返回值。submit(Runnable task)
則是沒(méi)有返回值的,就算獲取它的返回值也是nullFuture.get()
方法會(huì)使取結(jié)果的線程進(jìn)入阻塞狀態(tài),直到線程執(zhí)行完成之后,喚醒取結(jié)果的線程,然后返回結(jié)果
1.6 Spring線程池
Spring 通過(guò)任務(wù)執(zhí)行器(TaskExecutor)來(lái)實(shí)現(xiàn)多線程和并發(fā)編程,使用ThreadPoolTaskExecutor實(shí)現(xiàn)一個(gè)基于線程池的TaskExecutor,
還得需要使用@EnableAsync開(kāi)啟異步,并通過(guò)在需要的異步方法那里使用注解@Async聲明是一個(gè)異步任務(wù)
Spring 已經(jīng)實(shí)現(xiàn)的異常線程池:
SimpleAsyncTaskExecutor
:不是真的線程池,這個(gè)類不重用線程,每次調(diào)用都會(huì)創(chuàng)建一個(gè)新的線程。SyncTaskExecutor
:這個(gè)類沒(méi)有實(shí)現(xiàn)異步調(diào)用,只是一個(gè)同步操作。只適用于不需要多線程的地方ConcurrentTaskExecutor
:Executor的適配類,不推薦使用。如果ThreadPoolTaskExecutor不滿足要求時(shí),才用考慮使用這個(gè)類SimpleThreadPoolTaskExecutor
:是Quartz的SimpleThreadPool的類。線程池同時(shí)被quartz和非quartz使用,才需要使用此類ThreadPoolTaskExecutor
:最常使用,推薦。 其實(shí)質(zhì)是對(duì)java.util.concurrent.ThreadPoolExecutor的包裝
1.7 @Async調(diào)用中的事務(wù)處理機(jī)制
點(diǎn)擊了解使用@Async使用的事務(wù)問(wèn)題
2 示例
2.1 線程池配置類
package cn.jzh.thread; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; @Configuration @ComponentScan("cn.jzh.thread") @EnableAsync ?//開(kāi)啟異步操作 public class TaskExecutorConfig implements AsyncConfigurer { ? ? /** ? ? ?* 通過(guò)getAsyncExecutor方法配置ThreadPoolTaskExecutor,獲得一個(gè)基于線程池TaskExecutor ? ? ?* ? ? ?* @return ? ? ?*/ ? ? @Override ? ? public Executor getAsyncExecutor() { ? ? ? ? ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor(); ? ? ? ? pool.setCorePoolSize(5);//核心線程數(shù) ? ? ? ? pool.setMaxPoolSize(10);//最大線程數(shù) ? ? ? ? pool.setQueueCapacity(25);//線程隊(duì)列 ? ? ? ? pool.initialize();//線程初始化 ? ? ? ? return pool; ? ? } ? ? @Override ? ? public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { ? ? ? ? return null; ? ? } }
配置類中方法說(shuō)明:
Spring 中的ThreadPoolExecutor是借助JDK并發(fā)包中的java.util.concurrent.ThreadPoolExecutor來(lái)實(shí)現(xiàn)的。
其中一些值的含義如下:
int corePoolSize
:線程池維護(hù)線程的最小數(shù)量int maximumPoolSize
:線程池維護(hù)線程的最大數(shù)量,線程池中允許的最大線程數(shù),線程池中的當(dāng)前線程數(shù)目不會(huì)超過(guò)該值。如果隊(duì)列中任務(wù)已滿,并且當(dāng)前線程個(gè)數(shù)小于maximumPoolSize,那么會(huì)創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)。long keepAliveTime
:空閑線程的存活時(shí)間TimeUnitunit
:時(shí)間單位,現(xiàn)由納秒,微秒,毫秒,秒BlockingQueue workQueue
:持有等待執(zhí)行的任務(wù)隊(duì)列,一個(gè)阻塞隊(duì)列,用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù),當(dāng)線程池中的線程數(shù)超過(guò)它的corePoolSize的時(shí)候,線程會(huì)進(jìn)入阻塞隊(duì)列進(jìn)行阻塞等待RejectedExecutionHandler handler
線程池的拒絕策略,是指當(dāng)任務(wù)添加到線程池中被拒絕,而采取的處理措施。
當(dāng)任務(wù)添加到線程池中之所以被拒絕,可能是由于:第一,線程池異常關(guān)閉。第二,任務(wù)數(shù)量超過(guò)線程池的最大限制。
Reject策略預(yù)定義有四種:
ThreadPoolExecutor.AbortPolicy
策略,是默認(rèn)的策略,處理程序遭到拒絕將拋出運(yùn)行時(shí) RejectedExecutionExceptionThreadPoolExecutor.CallerRunsPolicy
策略 ,調(diào)用者的線程會(huì)執(zhí)行該任務(wù),如果執(zhí)行器已關(guān)閉,則丟棄.ThreadPoolExecutor.DiscardPolicy
策略,不能執(zhí)行的任務(wù)將被丟棄.ThreadPoolExecutor.DiscardOldestPolicy
策略,如果執(zhí)行程序尚未關(guān)閉,則位于工作隊(duì)列頭部的任務(wù)將被刪除,然后重試執(zhí)行程序(如果再次失敗,則重復(fù)此過(guò)程)
自定義策略:當(dāng)然也可以根據(jù)應(yīng)用場(chǎng)景需要來(lái)實(shí)現(xiàn)RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務(wù)
2.2 異步方法
@Async注解可以用在方法上,表示該方法是個(gè)異步方法,也可以用在類上,那么表示此類的所有方法都是異步方法
異步方法會(huì)自動(dòng)注入使用ThreadPoolTaskExecutor作為TaskExecutor
package cn.jzh.thread; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; import java.util.concurrent.Future; @Service public class AsyncTaskService { ? ? /** ? ? ?*? ? ? ?* @param i ? ? ?*/ ? ? @Async ? ? public void executeAsync(Integer i) throws Exception{ ? ? ? ? System.out.println("線程ID:" + Thread.currentThread().getId() + "線程名字:" +Thread.currentThread().getName()+"執(zhí)行異步任務(wù):" + i); ? ? } ? ? @Async ? ? public Future<String> executeAsyncPlus(Integer i) throws Exception { ? ? ? ? System.out.println("線程ID:" + Thread.currentThread().getId() +"線程名字:" +Thread.currentThread().getName()+ "執(zhí)行異步有返回的任務(wù):" + i); ? ? ? ? return new AsyncResult<>("success:"+i); ? ? } }
2.3 啟動(dòng)測(cè)試
package cn.jzh.thread; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.util.concurrent.Future; public class MainApp { ? ? public static void main(String[] args) throws Exception{ ? ? ? ? System.out.println("主線程id:" + Thread.currentThread().getId() + "開(kāi)始執(zhí)行調(diào)用任務(wù)..."); ? ? ? ? AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TaskExecutorConfig.class); ? ? ? ? AsyncTaskService service = context.getBean(AsyncTaskService.class); ? ? ? ? for (int i = 0;i<10;i++){ ? ? ? ? ? ? service.executeAsync(i); ? ? ? ? ? ? Future<String> result = service.executeAsyncPlus(i); ? ? ? ? ? ? System.out.println("異步程序執(zhí)行結(jié)束,獲取子線程返回內(nèi)容(會(huì)阻塞當(dāng)前main線程)" + result.get()); ? ? ? ? } ? ? ? ? context.close(); ? ? ? ? System.out.println("主線程id:" + Thread.currentThread().getId() + "程序結(jié)束!!"); ? ? } }
注意:
1.是否影響主線程
如果main主線程不去獲取子線程的結(jié)果(Future.get()),那么主線程完全可以不阻塞。那么,此時(shí),主線程和子線程完全異步。此功能,可以做成類似MQ消息中間件之類的,消息異步進(jìn)行發(fā)送
2.判斷是否執(zhí)行完畢
當(dāng)返回的數(shù)據(jù)類型為Future類型,其為一個(gè)接口。具體的結(jié)果類型為AsyncResult,這個(gè)是需要注意的地方。
調(diào)用返回結(jié)果的異步方法,判斷是否執(zhí)行完畢時(shí)需要使用future.isDone()來(lái)判斷是否執(zhí)行完畢
public void testAsyncAnnotationForMethodsWithReturnType() ? ? ?throws InterruptedException, ExecutionException { ? ? ? System.out.println("Invoking an asynchronous method. " ? + Thread.currentThread().getName()); ? ? ? Future<String> future = asyncAnnotationExample.asyncMethodWithReturnType(); ? ? ? while (true) { ?///這里使用了循環(huán)判斷,等待獲取結(jié)果信息 ? ? ? ? ? if (future.isDone()) { ?//判斷是否執(zhí)行完畢 ? ? ? ? ? ? ? System.out.println("Result from asynchronous process - " + future.get()); ? ? ? ? ? ? ? break; ? ? ? ? ? } ? ? ? ? ? System.out.println("Continue doing something else. "); ? ? ? ? ? Thread.sleep(1000); ? ? ? } ? }
這些獲取異步方法的結(jié)果信息,是通過(guò)不停的檢查Future的狀態(tài)來(lái)獲取當(dāng)前的異步方法是否執(zhí)行完畢來(lái)實(shí)現(xiàn)的
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
java讀取配置文件自定義字段(yml、properties)
本文主要介紹了java讀取配置文件自定義字段(yml、properties),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07POI讀取excel簡(jiǎn)介_(kāi)動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要介紹了POI讀取excel簡(jiǎn)介,詳細(xì)的介紹了什么是Apache POI和組件,有興趣的可以了解了解一下2017-08-08Mybatisplus自動(dòng)填充實(shí)現(xiàn)方式及代碼示例
這篇文章主要介紹了Mybatisplus自動(dòng)填充實(shí)現(xiàn)方式及代碼示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11Spring boot從安裝到交互功能實(shí)現(xiàn)零基礎(chǔ)全程詳解
這篇文章主要介紹了Spring boot從安裝到交互功能得實(shí)現(xiàn)全程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07Java使用Semaphore對(duì)單接口進(jìn)行限流
本篇主要講如何使用Semaphore對(duì)單接口進(jìn)行限流,主要有三種方式,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-07-07在非spring環(huán)境中調(diào)用service中的方法
非Spring環(huán)境指的是不使用Spring框架來(lái)管理和配置應(yīng)用程序的運(yùn)行時(shí)環(huán)境,本文將給大家介紹如何在非spring環(huán)境中調(diào)用service中的方法,文中有詳細(xì)實(shí)現(xiàn)步驟,需要的朋友可以參考下2024-03-03spring中使用Mockito解決Bean依賴樹(shù)問(wèn)題方法
在本篇文章里小編給各位整理了關(guān)于spring中使用Mockito解決Bean依賴樹(shù)問(wèn)題方法,有需要的朋友們可以參考下。2020-01-01JAVA maven項(xiàng)目使用釘釘SDK獲取token、用戶
這篇文章主要介紹了JAVA maven項(xiàng)目使用釘釘SDK獲取token、用戶,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-06-06SpringBoot中實(shí)現(xiàn)@Scheduled動(dòng)態(tài)定時(shí)任務(wù)
SpringBoot中的@Scheduled注解為定時(shí)任務(wù)提供了一種很簡(jiǎn)單的實(shí)現(xiàn),本文主要介紹了SpringBoot中實(shí)現(xiàn)@Scheduled動(dòng)態(tài)定時(shí)任務(wù),具有一定的參考價(jià)值,感興趣的可以了解一下2024-01-01