Java線程池并發(fā)執(zhí)行多個任務(wù)方式
Java線程池并發(fā)執(zhí)行多個任務(wù)
Java在語言層面提供了多線程的支持,線程池能夠避免頻繁的線程創(chuàng)建和銷毀的開銷,因此很多時候在項目當(dāng)中我們是使用的線程池去完成多線程的任務(wù)。
Java提供了Executors 框架提供了一些基礎(chǔ)的組件能夠輕松的完成多線程異步的操作,Executors提供了一系列的靜態(tài)工廠方法能夠獲取不同的ExecutorService實現(xiàn),ExecutorService擴展了Executors接口,Executors相當(dāng)簡單:
public interface Executor { ? ? void execute(Runnable command); }
把任務(wù)本身和任務(wù)的執(zhí)行解耦了,如果說Runnable是可異步執(zhí)行任務(wù)的抽象,那Executor就是如何執(zhí)行可異步執(zhí)行任務(wù)的抽象,說起來比較繞口。
本文不講解線程的一些基礎(chǔ)知識,因為網(wǎng)上的其他文章已經(jīng)寫的足夠詳細和泛濫。我寫寫多個異步任務(wù)的并發(fā)執(zhí)行與結(jié)果的獲取問題。
假設(shè)這樣一個場景:我們要組裝一個對象,這個對象由大量小的內(nèi)容組成,這些內(nèi)容是無關(guān)聯(lián)無依賴關(guān)系的,如果我們串行的去執(zhí)行,如果每個任務(wù)耗時10秒鐘,一共有10個任務(wù),那我們就需要100秒才能獲取到結(jié)果。顯然我們可以采用線程池,每個任務(wù)起一個線程,忽略線程啟動時間,我們只需要10秒鐘就能獲取到結(jié)果。這里還有兩種選擇,這10秒鐘我們可以去做其他事,也可以等待結(jié)果。
我們來完成這樣的操作:
// 這是任務(wù)的抽象 class GetContentTask implements Callable<String> { ?? ??? ? ?? ??? ?private String name; ?? ??? ? ?? ??? ?private Integer sleepTimes; ?? ??? ? ?? ??? ?public GetContentTask(String name, Integer sleepTimes) { ?? ??? ??? ?this.name = name; ?? ??? ??? ?this.sleepTimes = sleepTimes; ?? ??? ?} ?? ??? ?public String call() throws Exception { ?? ??? ??? ?// 假設(shè)這是一個比較耗時的操作 ?? ??? ??? ?Thread.sleep(sleepTimes * 1000); ?? ??? ??? ?return "this is content : hello " + this.name; ?? ??? ?} ?? ??? ? ?? ?}
采用completionService :
// 方法一 ?? ??? ?ExecutorService executorService = Executors.newCachedThreadPool(); ?? ??? ?CompletionService<String> completionService = new ExecutorCompletionService(executorService); ?? ??? ?ExecuteServiceDemo executeServiceDemo = new ExecuteServiceDemo(); ?? ??? ?// 十個 ?? ??? ?long startTime = System.currentTimeMillis(); ?? ??? ?int count = 0; ?? ??? ?for (int i = 0;i < 10;i ++) { ?? ??? ??? ?count ++; ?? ??? ??? ?GetContentTask getContentTask = new ExecuteServiceDemo.GetContentTask("micro" + i, 10); ?? ??? ??? ?completionService.submit(getContentTask); ?? ??? ?} ?? ??? ?System.out.println("提交完任務(wù),主線程空閑了, 可以去做一些事情。"); ?? ??? ?// 假裝做了8秒種其他事情 ?? ??? ?try { ?? ??? ??? ?Thread.sleep(8000); ?? ??? ??? ?System.out.println("主線程做完了,等待結(jié)果"); ?? ??? ?} catch (InterruptedException e) { ?? ??? ??? ?e.printStackTrace(); ?? ??? ?} ?? ??? ?try { ?? ??? ??? ?// 做完事情要結(jié)果 ?? ??? ??? ?for (int i = 0;i < count;i ++) { ?? ??? ??? ??? ?Future<String> result = completionService.take(); ?? ??? ??? ??? ?System.out.println(result.get()); ?? ??? ??? ?} ?? ??? ??? ?long endTime = System.currentTimeMillis(); ?? ??? ??? ?System.out.println("耗時 : " + (endTime - startTime) / 1000); ?? ??? ?} ?catch (Exception ex) { ?? ??? ??? ?System.out.println(ex.getMessage()); ?? ??? ?}
執(zhí)行結(jié)果為:
提交完任務(wù),主線程空閑了, 可以去做一些事情。
主線程做完了,等待結(jié)果
this is content : hello micro9
this is content : hello micro7
this is content : hello micro2
this is content : hello micro5
this is content : hello micro4
this is content : hello micro8
this is content : hello micro1
this is content : hello micro3
this is content : hello micro0
this is content : hello micro6
耗時 : 10
如果多個不想一個一個提交,可以采用 invokeAll一并提交,但是會同步等待這些任務(wù)
// 方法二 ?? ??? ?ExecutorService executeService = Executors.newCachedThreadPool(); ?? ??? ?List<GetContentTask> taskList = new ArrayList<GetContentTask>(); ?? ??? ?long startTime = System.currentTimeMillis(); ?? ??? ?for (int i = 0;i < 10;i ++) { ?? ??? ??? ?taskList.add(new GetContentTask("micro" + i, 10)); ?? ??? ?} ?? ??? ?try { ?? ??? ??? ?System.out.println("主線程發(fā)起異步任務(wù)請求"); ?? ??? ??? ?List<Future<String>> resultList = executeService.invokeAll(taskList); ?? ??? ??? ?// 這里會阻塞等待resultList獲取到所有異步執(zhí)行的結(jié)果才會執(zhí)行? ?? ??? ??? ?for (Future<String> future : resultList) { ?? ??? ??? ??? ?System.out.println(future.get()); ?? ??? ??? ?} ?? ??? ??? ?// 主線程假裝很忙執(zhí)行8秒鐘 ?? ??? ??? ?Thread.sleep(8); ?? ??? ??? ?long endTime = System.currentTimeMillis(); ?? ??? ??? ?System.out.println("耗時 : " + (endTime - startTime) / 1000); ?? ??? ?} catch (Exception e) { ?? ??? ??? ?e.printStackTrace(); ?? ??? ?}
主線程發(fā)起異步任務(wù)請求
this is content : hello micro0
this is content : hello micro1
this is content : hello micro2
this is content : hello micro3
this is content : hello micro4
this is content : hello micro5
this is content : hello micro6
this is content : hello micro7
this is content : hello micro8
this is content : hello micro9
耗時 : 10
如果一系列請求,我們并不需要等待每個請求,我們可以invokeAny,只要某一個請求返回即可。
ExecutorService executorService = Executors.newCachedThreadPool(); ?? ??? ?ArrayList<GetContentTask> taskList = new ArrayList<GetContentTask>(); ?? ??? ?taskList.add(new GetContentTask("micro1",3)); ?? ??? ?taskList.add(new GetContentTask("micro2", 6)); ?? ??? ?try { ?? ??? ??? ?List<Future<String>> resultList = executorService.invokeAll(taskList);// 等待6秒? //?? ??? ??? ?String result2 = executorService.invokeAny(taskList); // 等待3秒 ?? ??? ??? ?// invokeAll 提交一堆任務(wù)并行處理并拿到結(jié)果 ?? ??? ??? ?// invokeAny就是提交一堆并行任務(wù)拿到一個結(jié)果即可 ?? ??? ??? ?for (Future<String> result : resultList) { ?? ??? ??? ??? ?System.out.println(result.get()); ?? ??? ??? ?} //?? ??? ??? ?System.out.println(result2); ?? ??? ?} catch (Exception e) { ?? ??? ??? ?e.printStackTrace(); ?? ??? ?} ?? ??? ?System.out.println("主線程等待");
如果我雖然發(fā)送了一堆異步的任務(wù),但是我只等待一定的時間,在規(guī)定的時間沒有返回我就不要了,例如很多時候的網(wǎng)絡(luò)請求其他服務(wù)器如果要數(shù)據(jù),由于網(wǎng)絡(luò)原因不能一直等待,在規(guī)定時間內(nèi)去拿,拿不到就我使用一個默認值。
這樣的場景,我們可以使用下面的寫法:
try { ?? ??? ??? ?ExecutorService executorService = Executors.newCachedThreadPool(); ?? ??? ??? ?List<Callable<String>> taskList = new ArrayList<Callable<String>>(); ?? ??? ??? ?taskList.add(new GetContentTask("micro1", 4)); ?? ??? ??? ?taskList.add(new GetContentTask("micro2", 6)); ?? ??? ??? ?// 等待五秒 ?? ??? ??? ?List<Future<String>> resultList = executorService.invokeAll(taskList, 5, TimeUnit.SECONDS); ?? ??? ??? ?for (Future<String> future : resultList) { ?? ??? ??? ??? ?System.out.println(future.get()); ?? ??? ??? ?} ?? ??? ?} catch (Exception e) { ?? ??? ??? ?e.printStackTrace(); ?? ??? ?}?
this is content : hello micro1
java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.micro.demo.spring.ExecuteServiceDemo.main(ExecuteServiceDemo.java:105)
因為只等待5秒,6秒的那個任務(wù)自然獲取不到,拋出異常,如果將等待時間設(shè)置成8秒,就都能獲取到。
Java線程池的正確使用
線程可認為是操作系統(tǒng)可調(diào)度的最小的程序執(zhí)行序列,一般作為進程的組成部分,同一進程中多個線程可共享該進程的資源(如內(nèi)存等)。JVM線程跟內(nèi)核輕量級進程有一對一的映射關(guān)系,所以JVM中的線程是很寶貴的。
一般在工程上多線程的實現(xiàn)是基于線程池的。因為相比自己創(chuàng)建線程,多線程具有以下優(yōu)點:
- 線程是稀缺資源,使用線程池可以減少創(chuàng)建和銷毀線程的次數(shù),每個工作線程都可以重復(fù)使用。
- 可以根據(jù)系統(tǒng)的承受能力,調(diào)整線程池中工作線程的數(shù)量,防止因為消耗過多內(nèi)存導(dǎo)致服務(wù)器崩潰。
1.Executors存在什么問題
看阿里巴巴開發(fā)手冊并發(fā)編程這塊有一條:線程池不允許使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式。
2.Executors為什么存在缺陷
2.1 線程池工作原理
當(dāng)一個任務(wù)通過execute(Runnable)方法欲添加到線程池時:
- 如果此時線程池中的數(shù)量小于corePoolSize,即使線程池中的線程都處于空閑狀態(tài),也要創(chuàng)建新的線程來處理被添加的務(wù)。
- 如果此時線程池中的數(shù)量等于 corePoolSize,但是緩沖隊列 workQueue未滿,那么任務(wù)被放入緩沖隊列。
- 如果此時線程池中的數(shù)量大于corePoolSize,緩沖隊列workQueue滿,并且線程池中的數(shù)量小于maximumPoolSize,建新的線程來處理被添加的任務(wù)。
- 那么通過 handler所指定的策略來處理此任務(wù)。也就是:處理任務(wù)的優(yōu)先級為:核心線程corePoolSize、任務(wù)隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務(wù)。
- 當(dāng)線程池中的線程數(shù)量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態(tài)的調(diào)整池中的線程數(shù)。
2.2 newFixedThreadPool分析
Java中的BlockingQueue主要有兩種實現(xiàn),分別是ArrayBlockingQueue 和 LinkedBlockingQueue。
ArrayBlockingQueue是一個用數(shù)組實現(xiàn)的有界阻塞隊列,必須設(shè)置容量。
LinkedBlockingQueue是一個用鏈表實現(xiàn)的有界阻塞隊列,容量可以選擇進行設(shè)置,不設(shè)置的話,將是一個無邊界的阻塞隊列,最大長度為Integer.MAX_VALUE。
這里的問題就出在:不設(shè)置的話,將是一個無邊界的阻塞隊列,最大長度為Integer.MAX_VALUE。也就是說,如果我們不設(shè)置LinkedBlockingQueue的容量的話,其默認容量將會是Integer.MAX_VALUE。而newFixedThreadPool中創(chuàng)建LinkedBlockingQueue時,并未指定容量。此時,LinkedBlockingQueue就是一個無邊界隊列,對于一個無邊界隊列來說,是可以不斷的向隊列中加入任務(wù)的,這種情況下就有可能因為任務(wù)過多而導(dǎo)致內(nèi)存溢出問題。
2.3 newCachedThreadPool分析
結(jié)合上述流程圖,核心線程數(shù)=0,最大線程無限大,由于SynchronousQueue是一個緩存值為1的阻塞隊列。當(dāng)有大量任務(wù)請求時,線程池會創(chuàng)建大量線程,造成OOM。
3. 線程池參數(shù)詳解
3.1 構(gòu)造方法
3.2 線程池拒絕策略
RejectedExecutionHandler(飽和策略):當(dāng)隊列和線程池都滿了,說明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務(wù)。這個策略默認情況下是AbortPolicy,表示無法處理新任務(wù)時拋出異常。。以下是JDK1.5提供的四種策略。
- AbortPolicy:直接拋出異常
- CallerRunsPolicy:只用調(diào)用者所在線程來運行任務(wù)。
- DiscardOldestPolicy:丟棄隊列里最近的一個任務(wù),并執(zhí)行當(dāng)前任務(wù)。DiscardPolicy:不處理,丟棄掉。
- 當(dāng)然也可以根據(jù)應(yīng)用場景需要來實現(xiàn)RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務(wù)。
4. 線程池正確打開方式
4.1 創(chuàng)建線程池
避免使用Executors創(chuàng)建線程池,主要是避免使用其中的默認實現(xiàn),那么我們可以自己直接調(diào)用ThreadPoolExecutor的構(gòu)造函數(shù)來自己創(chuàng)建線程池。在創(chuàng)建的同時,給BlockQueue指定容量就可以了。
4.2 向線程池提交任務(wù)
我們可以使用execute提交的任務(wù),但是execute方法沒有返回值,所以無法判斷任務(wù)知否被線程池執(zhí)行成功。通過以下代碼可知execute方法輸入的任務(wù)是一個Runnable類的實例。
我們也可以使用submit 方法來提交任務(wù),它會返回一個future,那么我們可以通過這個future來判斷任務(wù)是否執(zhí)行成功,通過future的get方法來獲取返回值,get方法會阻塞住直到任務(wù)完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間后立即返回,這時有可能任務(wù)沒有執(zhí)行完。
4.3 關(guān)閉線程池
shutdown關(guān)閉線程池
方法定義:public void shutdown()
- 線程池的狀態(tài)變成SHUTDOWN狀態(tài),此時不能再往線程池中添加新的任務(wù),否則會拋出RejectedExecutionException異常。
- 線程池不會立刻退出,直到添加到線程池中的任務(wù)都已經(jīng)處理完成,才會退出。
注意:這個函數(shù)不會等待提交的任務(wù)執(zhí)行完成,要想等待全部任務(wù)完成,可以調(diào)用:
public boolean awaitTermination(longtimeout, TimeUnit unit)
shutdownNow關(guān)閉線程池并中斷任務(wù)
方法定義:public List shutdownNow()
- 線程池的狀態(tài)立刻變成STOP狀態(tài),此時不能再往線程池中添加新的任務(wù)。
- 終止等待執(zhí)行的線程,并返回它們的列表;
- 試圖停止所有正在執(zhí)行的線程,試圖終止的方法是調(diào)用Thread.interrupt(),但是大家知道,如果線程中沒有sleep 、wait、Condition、定時鎖等應(yīng)用, interrupt()方法是無法中斷當(dāng)前的線程的。所以,ShutdownNow()并不代表線程池就一定立即就能退出,它可能必須要等待所有正在執(zhí)行的任務(wù)都執(zhí)行完成了才能退出。
4.4 如何配置線程池大小
CPU密集型任務(wù)
- 該任務(wù)需要大量的運算,并且沒有阻塞,CPU一直全速運行,CPU密集任務(wù)只有在真正的多核CPU上才可能通過多線程加速 CPU密集型任務(wù)配置盡可能少的線程數(shù)量:
- CPU核數(shù)+1個線程的線程池。
- 例如: CPU 16核,內(nèi)存32G。線程數(shù)=16
IO密集型任務(wù)
- IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如:CPU核數(shù)*2
- 某大廠設(shè)置策略:IO密集型時,大部分線程都阻塞,故需要多配置線程數(shù):
- CPU核數(shù)/(1-阻塞系數(shù))
- 例如: CPU 16核, 阻塞系數(shù) 0.9 ------------->16/(1-0.9) = 160 個線程數(shù)。
- 此時非阻塞線程=16
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
java?fastjson傳輸long數(shù)據(jù)卻接收到了int的問題
這篇文章主要介紹了java?fastjson傳輸long數(shù)據(jù)卻接收到了int的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01Springboot實現(xiàn)多線程及線程池監(jiān)控
線程池的監(jiān)控很重要,本文就來介紹一下Springboot實現(xiàn)多線程及線程池監(jiān)控,文中通過示例代碼介紹的非常詳細,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-01-01jsp、servlet前后端交互對數(shù)據(jù)處理及展示的簡單實現(xiàn)
Servlet和JSP是Java Web開發(fā)中的兩個重要概念,在Servlet和JSP中前后端交互可以通過一些方式來實現(xiàn),這篇文章主要給大家介紹了關(guān)于jsp、servlet前后端交互對數(shù)據(jù)處理及展示的簡單實現(xiàn),需要的朋友可以參考下2023-12-12