Java線程池并發(fā)執(zhí)行多個(gè)任務(wù)方式
Java線程池并發(fā)執(zhí)行多個(gè)任務(wù)
Java在語言層面提供了多線程的支持,線程池能夠避免頻繁的線程創(chuàng)建和銷毀的開銷,因此很多時(shí)候在項(xiàng)目當(dāng)中我們是使用的線程池去完成多線程的任務(wù)。
Java提供了Executors 框架提供了一些基礎(chǔ)的組件能夠輕松的完成多線程異步的操作,Executors提供了一系列的靜態(tài)工廠方法能夠獲取不同的ExecutorService實(shí)現(xiàn),ExecutorService擴(kuò)展了Executors接口,Executors相當(dāng)簡單:
public interface Executor { ? ? void execute(Runnable command); }
把任務(wù)本身和任務(wù)的執(zhí)行解耦了,如果說Runnable是可異步執(zhí)行任務(wù)的抽象,那Executor就是如何執(zhí)行可異步執(zhí)行任務(wù)的抽象,說起來比較繞口。
本文不講解線程的一些基礎(chǔ)知識,因?yàn)榫W(wǎng)上的其他文章已經(jīng)寫的足夠詳細(xì)和泛濫。我寫寫多個(gè)異步任務(wù)的并發(fā)執(zhí)行與結(jié)果的獲取問題。
假設(shè)這樣一個(gè)場景:我們要組裝一個(gè)對象,這個(gè)對象由大量小的內(nèi)容組成,這些內(nèi)容是無關(guān)聯(lián)無依賴關(guān)系的,如果我們串行的去執(zhí)行,如果每個(gè)任務(wù)耗時(shí)10秒鐘,一共有10個(gè)任務(wù),那我們就需要100秒才能獲取到結(jié)果。顯然我們可以采用線程池,每個(gè)任務(wù)起一個(gè)線程,忽略線程啟動時(shí)間,我們只需要10秒鐘就能獲取到結(jié)果。這里還有兩種選擇,這10秒鐘我們可以去做其他事,也可以等待結(jié)果。
我們來完成這樣的操作:
// 這是任務(wù)的抽象 class GetContentTask implements Callable<String> { ?? ??? ? ?? ??? ?private String name; ?? ??? ? ?? ??? ?private Integer sleepTimes; ?? ??? ? ?? ??? ?public GetContentTask(String name, Integer sleepTimes) { ?? ??? ??? ?this.name = name; ?? ??? ??? ?this.sleepTimes = sleepTimes; ?? ??? ?} ?? ??? ?public String call() throws Exception { ?? ??? ??? ?// 假設(shè)這是一個(gè)比較耗時(shí)的操作 ?? ??? ??? ?Thread.sleep(sleepTimes * 1000); ?? ??? ??? ?return "this is content : hello " + this.name; ?? ??? ?} ?? ??? ? ?? ?}
采用completionService :
// 方法一 ?? ??? ?ExecutorService executorService = Executors.newCachedThreadPool(); ?? ??? ?CompletionService<String> completionService = new ExecutorCompletionService(executorService); ?? ??? ?ExecuteServiceDemo executeServiceDemo = new ExecuteServiceDemo(); ?? ??? ?// 十個(gè) ?? ??? ?long startTime = System.currentTimeMillis(); ?? ??? ?int count = 0; ?? ??? ?for (int i = 0;i < 10;i ++) { ?? ??? ??? ?count ++; ?? ??? ??? ?GetContentTask getContentTask = new ExecuteServiceDemo.GetContentTask("micro" + i, 10); ?? ??? ??? ?completionService.submit(getContentTask); ?? ??? ?} ?? ??? ?System.out.println("提交完任務(wù),主線程空閑了, 可以去做一些事情。"); ?? ??? ?// 假裝做了8秒種其他事情 ?? ??? ?try { ?? ??? ??? ?Thread.sleep(8000); ?? ??? ??? ?System.out.println("主線程做完了,等待結(jié)果"); ?? ??? ?} catch (InterruptedException e) { ?? ??? ??? ?e.printStackTrace(); ?? ??? ?} ?? ??? ?try { ?? ??? ??? ?// 做完事情要結(jié)果 ?? ??? ??? ?for (int i = 0;i < count;i ++) { ?? ??? ??? ??? ?Future<String> result = completionService.take(); ?? ??? ??? ??? ?System.out.println(result.get()); ?? ??? ??? ?} ?? ??? ??? ?long endTime = System.currentTimeMillis(); ?? ??? ??? ?System.out.println("耗時(shí) : " + (endTime - startTime) / 1000); ?? ??? ?} ?catch (Exception ex) { ?? ??? ??? ?System.out.println(ex.getMessage()); ?? ??? ?}
執(zhí)行結(jié)果為:
提交完任務(wù),主線程空閑了, 可以去做一些事情。
主線程做完了,等待結(jié)果
this is content : hello micro9
this is content : hello micro7
this is content : hello micro2
this is content : hello micro5
this is content : hello micro4
this is content : hello micro8
this is content : hello micro1
this is content : hello micro3
this is content : hello micro0
this is content : hello micro6
耗時(shí) : 10
如果多個(gè)不想一個(gè)一個(gè)提交,可以采用 invokeAll一并提交,但是會同步等待這些任務(wù)
// 方法二 ?? ??? ?ExecutorService executeService = Executors.newCachedThreadPool(); ?? ??? ?List<GetContentTask> taskList = new ArrayList<GetContentTask>(); ?? ??? ?long startTime = System.currentTimeMillis(); ?? ??? ?for (int i = 0;i < 10;i ++) { ?? ??? ??? ?taskList.add(new GetContentTask("micro" + i, 10)); ?? ??? ?} ?? ??? ?try { ?? ??? ??? ?System.out.println("主線程發(fā)起異步任務(wù)請求"); ?? ??? ??? ?List<Future<String>> resultList = executeService.invokeAll(taskList); ?? ??? ??? ?// 這里會阻塞等待resultList獲取到所有異步執(zhí)行的結(jié)果才會執(zhí)行? ?? ??? ??? ?for (Future<String> future : resultList) { ?? ??? ??? ??? ?System.out.println(future.get()); ?? ??? ??? ?} ?? ??? ??? ?// 主線程假裝很忙執(zhí)行8秒鐘 ?? ??? ??? ?Thread.sleep(8); ?? ??? ??? ?long endTime = System.currentTimeMillis(); ?? ??? ??? ?System.out.println("耗時(shí) : " + (endTime - startTime) / 1000); ?? ??? ?} catch (Exception e) { ?? ??? ??? ?e.printStackTrace(); ?? ??? ?}
主線程發(fā)起異步任務(wù)請求
this is content : hello micro0
this is content : hello micro1
this is content : hello micro2
this is content : hello micro3
this is content : hello micro4
this is content : hello micro5
this is content : hello micro6
this is content : hello micro7
this is content : hello micro8
this is content : hello micro9
耗時(shí) : 10
如果一系列請求,我們并不需要等待每個(gè)請求,我們可以invokeAny,只要某一個(gè)請求返回即可。
ExecutorService executorService = Executors.newCachedThreadPool(); ?? ??? ?ArrayList<GetContentTask> taskList = new ArrayList<GetContentTask>(); ?? ??? ?taskList.add(new GetContentTask("micro1",3)); ?? ??? ?taskList.add(new GetContentTask("micro2", 6)); ?? ??? ?try { ?? ??? ??? ?List<Future<String>> resultList = executorService.invokeAll(taskList);// 等待6秒? //?? ??? ??? ?String result2 = executorService.invokeAny(taskList); // 等待3秒 ?? ??? ??? ?// invokeAll 提交一堆任務(wù)并行處理并拿到結(jié)果 ?? ??? ??? ?// invokeAny就是提交一堆并行任務(wù)拿到一個(gè)結(jié)果即可 ?? ??? ??? ?for (Future<String> result : resultList) { ?? ??? ??? ??? ?System.out.println(result.get()); ?? ??? ??? ?} //?? ??? ??? ?System.out.println(result2); ?? ??? ?} catch (Exception e) { ?? ??? ??? ?e.printStackTrace(); ?? ??? ?} ?? ??? ?System.out.println("主線程等待");
如果我雖然發(fā)送了一堆異步的任務(wù),但是我只等待一定的時(shí)間,在規(guī)定的時(shí)間沒有返回我就不要了,例如很多時(shí)候的網(wǎng)絡(luò)請求其他服務(wù)器如果要數(shù)據(jù),由于網(wǎng)絡(luò)原因不能一直等待,在規(guī)定時(shí)間內(nèi)去拿,拿不到就我使用一個(gè)默認(rèn)值。
這樣的場景,我們可以使用下面的寫法:
try { ?? ??? ??? ?ExecutorService executorService = Executors.newCachedThreadPool(); ?? ??? ??? ?List<Callable<String>> taskList = new ArrayList<Callable<String>>(); ?? ??? ??? ?taskList.add(new GetContentTask("micro1", 4)); ?? ??? ??? ?taskList.add(new GetContentTask("micro2", 6)); ?? ??? ??? ?// 等待五秒 ?? ??? ??? ?List<Future<String>> resultList = executorService.invokeAll(taskList, 5, TimeUnit.SECONDS); ?? ??? ??? ?for (Future<String> future : resultList) { ?? ??? ??? ??? ?System.out.println(future.get()); ?? ??? ??? ?} ?? ??? ?} catch (Exception e) { ?? ??? ??? ?e.printStackTrace(); ?? ??? ?}?
this is content : hello micro1
java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.micro.demo.spring.ExecuteServiceDemo.main(ExecuteServiceDemo.java:105)
因?yàn)橹坏却?秒,6秒的那個(gè)任務(wù)自然獲取不到,拋出異常,如果將等待時(shí)間設(shè)置成8秒,就都能獲取到。
Java線程池的正確使用
線程可認(rèn)為是操作系統(tǒng)可調(diào)度的最小的程序執(zhí)行序列,一般作為進(jìn)程的組成部分,同一進(jìn)程中多個(gè)線程可共享該進(jìn)程的資源(如內(nèi)存等)。JVM線程跟內(nèi)核輕量級進(jìn)程有一對一的映射關(guān)系,所以JVM中的線程是很寶貴的。
一般在工程上多線程的實(shí)現(xiàn)是基于線程池的。因?yàn)橄啾茸约簞?chuàng)建線程,多線程具有以下優(yōu)點(diǎn):
- 線程是稀缺資源,使用線程池可以減少創(chuàng)建和銷毀線程的次數(shù),每個(gè)工作線程都可以重復(fù)使用。
- 可以根據(jù)系統(tǒng)的承受能力,調(diào)整線程池中工作線程的數(shù)量,防止因?yàn)橄倪^多內(nèi)存導(dǎo)致服務(wù)器崩潰。
1.Executors存在什么問題
看阿里巴巴開發(fā)手冊并發(fā)編程這塊有一條:線程池不允許使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式。
2.Executors為什么存在缺陷
2.1 線程池工作原理
當(dāng)一個(gè)任務(wù)通過execute(Runnable)方法欲添加到線程池時(shí):
- 如果此時(shí)線程池中的數(shù)量小于corePoolSize,即使線程池中的線程都處于空閑狀態(tài),也要創(chuàng)建新的線程來處理被添加的務(wù)。
- 如果此時(shí)線程池中的數(shù)量等于 corePoolSize,但是緩沖隊(duì)列 workQueue未滿,那么任務(wù)被放入緩沖隊(duì)列。
- 如果此時(shí)線程池中的數(shù)量大于corePoolSize,緩沖隊(duì)列workQueue滿,并且線程池中的數(shù)量小于maximumPoolSize,建新的線程來處理被添加的任務(wù)。
- 那么通過 handler所指定的策略來處理此任務(wù)。也就是:處理任務(wù)的優(yōu)先級為:核心線程corePoolSize、任務(wù)隊(duì)列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務(wù)。
- 當(dāng)線程池中的線程數(shù)量大于 corePoolSize時(shí),如果某線程空閑時(shí)間超過keepAliveTime,線程將被終止。這樣,線程池可以動態(tài)的調(diào)整池中的線程數(shù)。
2.2 newFixedThreadPool分析
Java中的BlockingQueue主要有兩種實(shí)現(xiàn),分別是ArrayBlockingQueue 和 LinkedBlockingQueue。
ArrayBlockingQueue是一個(gè)用數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列,必須設(shè)置容量。
LinkedBlockingQueue是一個(gè)用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列,容量可以選擇進(jìn)行設(shè)置,不設(shè)置的話,將是一個(gè)無邊界的阻塞隊(duì)列,最大長度為Integer.MAX_VALUE。
這里的問題就出在:不設(shè)置的話,將是一個(gè)無邊界的阻塞隊(duì)列,最大長度為Integer.MAX_VALUE。也就是說,如果我們不設(shè)置LinkedBlockingQueue的容量的話,其默認(rèn)容量將會是Integer.MAX_VALUE。而newFixedThreadPool中創(chuàng)建LinkedBlockingQueue時(shí),并未指定容量。此時(shí),LinkedBlockingQueue就是一個(gè)無邊界隊(duì)列,對于一個(gè)無邊界隊(duì)列來說,是可以不斷的向隊(duì)列中加入任務(wù)的,這種情況下就有可能因?yàn)槿蝿?wù)過多而導(dǎo)致內(nèi)存溢出問題。
2.3 newCachedThreadPool分析
結(jié)合上述流程圖,核心線程數(shù)=0,最大線程無限大,由于SynchronousQueue是一個(gè)緩存值為1的阻塞隊(duì)列。當(dāng)有大量任務(wù)請求時(shí),線程池會創(chuàng)建大量線程,造成OOM。
3. 線程池參數(shù)詳解
3.1 構(gòu)造方法
3.2 線程池拒絕策略
RejectedExecutionHandler(飽和策略):當(dāng)隊(duì)列和線程池都滿了,說明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務(wù)。這個(gè)策略默認(rèn)情況下是AbortPolicy,表示無法處理新任務(wù)時(shí)拋出異常。。以下是JDK1.5提供的四種策略。
- AbortPolicy:直接拋出異常
- CallerRunsPolicy:只用調(diào)用者所在線程來運(yùn)行任務(wù)。
- DiscardOldestPolicy:丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)。DiscardPolicy:不處理,丟棄掉。
- 當(dāng)然也可以根據(jù)應(yīng)用場景需要來實(shí)現(xiàn)RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務(wù)。
4. 線程池正確打開方式
4.1 創(chuàng)建線程池
避免使用Executors創(chuàng)建線程池,主要是避免使用其中的默認(rèn)實(shí)現(xiàn),那么我們可以自己直接調(diào)用ThreadPoolExecutor的構(gòu)造函數(shù)來自己創(chuàng)建線程池。在創(chuàng)建的同時(shí),給BlockQueue指定容量就可以了。
4.2 向線程池提交任務(wù)
我們可以使用execute提交的任務(wù),但是execute方法沒有返回值,所以無法判斷任務(wù)知否被線程池執(zhí)行成功。通過以下代碼可知execute方法輸入的任務(wù)是一個(gè)Runnable類的實(shí)例。
我們也可以使用submit 方法來提交任務(wù),它會返回一個(gè)future,那么我們可以通過這個(gè)future來判斷任務(wù)是否執(zhí)行成功,通過future的get方法來獲取返回值,get方法會阻塞住直到任務(wù)完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時(shí)間后立即返回,這時(shí)有可能任務(wù)沒有執(zhí)行完。
4.3 關(guān)閉線程池
shutdown關(guān)閉線程池
方法定義:public void shutdown()
- 線程池的狀態(tài)變成SHUTDOWN狀態(tài),此時(shí)不能再往線程池中添加新的任務(wù),否則會拋出RejectedExecutionException異常。
- 線程池不會立刻退出,直到添加到線程池中的任務(wù)都已經(jīng)處理完成,才會退出。
注意:這個(gè)函數(shù)不會等待提交的任務(wù)執(zhí)行完成,要想等待全部任務(wù)完成,可以調(diào)用:
public boolean awaitTermination(longtimeout, TimeUnit unit)
shutdownNow關(guān)閉線程池并中斷任務(wù)
方法定義:public List shutdownNow()
- 線程池的狀態(tài)立刻變成STOP狀態(tài),此時(shí)不能再往線程池中添加新的任務(wù)。
- 終止等待執(zhí)行的線程,并返回它們的列表;
- 試圖停止所有正在執(zhí)行的線程,試圖終止的方法是調(diào)用Thread.interrupt(),但是大家知道,如果線程中沒有sleep 、wait、Condition、定時(shí)鎖等應(yīng)用, interrupt()方法是無法中斷當(dāng)前的線程的。所以,ShutdownNow()并不代表線程池就一定立即就能退出,它可能必須要等待所有正在執(zhí)行的任務(wù)都執(zhí)行完成了才能退出。
4.4 如何配置線程池大小
CPU密集型任務(wù)
- 該任務(wù)需要大量的運(yùn)算,并且沒有阻塞,CPU一直全速運(yùn)行,CPU密集任務(wù)只有在真正的多核CPU上才可能通過多線程加速 CPU密集型任務(wù)配置盡可能少的線程數(shù)量:
- CPU核數(shù)+1個(gè)線程的線程池。
- 例如: CPU 16核,內(nèi)存32G。線程數(shù)=16
IO密集型任務(wù)
- IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如:CPU核數(shù)*2
- 某大廠設(shè)置策略:IO密集型時(shí),大部分線程都阻塞,故需要多配置線程數(shù):
- CPU核數(shù)/(1-阻塞系數(shù))
- 例如: CPU 16核, 阻塞系數(shù) 0.9 ------------->16/(1-0.9) = 160 個(gè)線程數(shù)。
- 此時(shí)非阻塞線程=16
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
idea中@Autowired注解下變量報(bào)紅的解決
這篇文章主要介紹了idea中@Autowired注解下變量報(bào)紅的解決,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11java?fastjson傳輸long數(shù)據(jù)卻接收到了int的問題
這篇文章主要介紹了java?fastjson傳輸long數(shù)據(jù)卻接收到了int的問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01Java實(shí)現(xiàn)List集合手動分頁的方法
在工作中難免會遇到,將組裝的集合數(shù)據(jù)進(jìn)行分頁處理,本文主要介紹了Java實(shí)現(xiàn)List集合手動分頁的方法,具有一定的參考價(jià)值,感興趣的可以了解一下2024-03-03使用java批量寫入環(huán)境變量的實(shí)現(xiàn)
本文主要介紹了使用java批量寫入環(huán)境變量,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-03-03Springboot實(shí)現(xiàn)多線程及線程池監(jiān)控
線程池的監(jiān)控很重要,本文就來介紹一下Springboot實(shí)現(xiàn)多線程及線程池監(jiān)控,文中通過示例代碼介紹的非常詳細(xì),需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-01-01MyBatis實(shí)現(xiàn)注冊及獲取Mapper
本文主要介紹了MyBatis實(shí)現(xiàn)注冊及獲取Mapper,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-03-03一文帶你吃透JSP增刪改查實(shí)戰(zhàn)案例詳細(xì)解讀
這篇文章主要為大家詳細(xì)介紹了JSP中增刪改查實(shí)戰(zhàn)案例的相關(guān)知識,文中的示例代碼講解現(xiàn)象,具有一定的借鑒價(jià)值,感興趣的小伙伴可以了解一下2023-03-03jsp、servlet前后端交互對數(shù)據(jù)處理及展示的簡單實(shí)現(xiàn)
Servlet和JSP是Java Web開發(fā)中的兩個(gè)重要概念,在Servlet和JSP中前后端交互可以通過一些方式來實(shí)現(xiàn),這篇文章主要給大家介紹了關(guān)于jsp、servlet前后端交互對數(shù)據(jù)處理及展示的簡單實(shí)現(xiàn),需要的朋友可以參考下2023-12-12