Java中的CompletionService批量異步執(zhí)行詳解
前景引入
我們知道線程池可以執(zhí)行異步任務(wù),同時(shí)可以通過(guò)返回值Future獲取返回值,所以異步任務(wù)大多數(shù)采用ThreadPoolExecutor+Future,如果存在如下情況,需要從任務(wù)一二三中獲取返回值后,保存到數(shù)據(jù)庫(kù)中,用異步邏輯實(shí)現(xiàn)代碼應(yīng)該如下所示。
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
Future<Integer> f1 = executorService.submit(() -> {
System.out.println("執(zhí)行任務(wù)一");
return 1;
});
Future<Integer> f2 = executorService.submit(() -> {
System.out.println("執(zhí)行任務(wù)二");
return 2;
});
Future<Integer> f3 = executorService.submit(() -> {
System.out.println("執(zhí)行任務(wù)三");
return 3;
});
Integer r1 = f1.get();
executorService.execute(()->{
// 省略保存r1操作
System.out.println(r1);
});
Integer r2 = f2.get();
executorService.execute(()->{
// 省略保存r2操作
System.out.println(r2);
});
Integer r3 = f3.get();
executorService.execute(()->{
// 省略保存r3操作
System.out.println(r3);
});
executorService.shutdown();
}這樣寫的代碼一點(diǎn)毛病沒(méi)有,邏輯都是正常的,但如果存在任務(wù)一查詢了比較耗時(shí)的操作,由于f1.get是阻塞執(zhí)行,那么就算任務(wù)二和任務(wù)三已經(jīng)返回結(jié)果,任務(wù)二的返回值和任務(wù)三的返回值都是不能保存到數(shù)據(jù)庫(kù)的,因?yàn)閒1.get將主線程阻塞了。
批量異步實(shí)現(xiàn)
那可以如何處理呢?可以采用萬(wàn)能的阻塞隊(duì)列,任務(wù)先執(zhí)行完畢的先入隊(duì),這樣可以保證其它線程入庫(kù)的速度不受影響,提高效率。
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(3);
Future<Integer> f1 = executorService.submit(() -> {
System.out.println("執(zhí)行任務(wù)一");
Thread.sleep(5000);
return 1;
});
Future<Integer> f2 = executorService.submit(() -> {
System.out.println("執(zhí)行任務(wù)二");
return 2;
});
Future<Integer> f3 = executorService.submit(() -> {
System.out.println("執(zhí)行任務(wù)三");
Thread.sleep(3000);
return 3;
});
executorService.execute(()->{
try {
Integer r1 = f1.get();
// 阻塞隊(duì)列入隊(duì)操作
queue.put(r1);
System.out.println(r1);
} catch (Exception e) {
e.printStackTrace();
}
});
executorService.execute(()->{
try {
Integer r2 = f2.get();
queue.put(r2);
System.out.println(r2);
} catch (Exception e) {
e.printStackTrace();
}
});
executorService.execute(()->{
try {
Integer r3 = f3.get();
queue.put(r3);
System.out.println(r3);
} catch (Exception e) {
e.printStackTrace();
}
});
// 循環(huán)次數(shù)不要使用queue.size限制,因?yàn)椴煌瑫r(shí)刻queue.size值是有可能不同的
for (int i = 0; i <3; i++) {
Integer integer = queue.take();
// 省略保存integer操作
executorService.execute(()->{
System.out.println("保存入庫(kù)=="+integer);
});
}
executorService.shutdown();
}產(chǎn)生結(jié)果如下

同樣的在生產(chǎn)中不建議使用,因?yàn)镾DK為我們提供了工具類CompletionService,CompletionService內(nèi)部就維護(hù)了一個(gè)阻塞隊(duì)列,唯一與上述代碼實(shí)現(xiàn)有所區(qū)別的是,阻塞隊(duì)列入庫(kù)的是Future對(duì)象,其余原理類似。
CompletionService
如何創(chuàng)建CompletionService
CompletionService同樣是一個(gè)接口,其具體實(shí)現(xiàn)為ExecutorCompletionService,創(chuàng)建CompletionService對(duì)象有兩種方式
public ExecutorCompletionService(Executor executor); public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)
CompletionService對(duì)象的創(chuàng)建都是需要指定線程池,如果在創(chuàng)建時(shí)沒(méi)有傳入阻塞對(duì)象,那么會(huì)采用默認(rèn)的LinkedBlockingQueue無(wú)界阻塞隊(duì)列,如果應(yīng)用到生產(chǎn)可能會(huì)產(chǎn)生OOM的情況,這是需要注意的。
CompletionService初體驗(yàn)
CompletionService如何做到批量執(zhí)行異步任務(wù)呢,將上述場(chǎng)景采用CompletionService實(shí)現(xiàn)下
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletionService completionService = new ExecutorCompletionService(executorService);
Future<Integer> f1 = completionService.submit(() -> {
System.out.println("執(zhí)行任務(wù)一");
Thread.sleep(5000);
return 1;
});
Future<Integer> f2 = completionService.submit(() -> {
System.out.println("執(zhí)行任務(wù)二");
return 2;
});
Future<Integer> f3 = completionService.submit(() -> {
System.out.println("執(zhí)行任務(wù)三");
Thread.sleep(3000);
return 3;
});
for (int i = 0; i <3 ; i++) {
Future take = completionService.take();
Integer integer = (Integer) take.get();
executorService.execute(()->{
System.out.println("執(zhí)行入庫(kù)=="+integer);
});
}
executorService.shutdown();
}CompletionService接口說(shuō)明
CompletionService的方法不多,使用起來(lái)比較簡(jiǎn)單,方法簽名如下
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletionService completionService = new ExecutorCompletionService(executorService);
Future<Integer> f1 = completionService.submit(() -> {
System.out.println("執(zhí)行任務(wù)一");
Thread.sleep(5000);
return 1;
});
Future<Integer> f2 = completionService.submit(() -> {
System.out.println("執(zhí)行任務(wù)二");
return 2;
});
Future<Integer> f3 = completionService.submit(() -> {
System.out.println("執(zhí)行任務(wù)三");
Thread.sleep(3000);
return 3;
});
for (int i = 0; i <3 ; i++) {
Future take = completionService.take();
Integer integer = (Integer) take.get();
executorService.execute(()->{
System.out.println("執(zhí)行入庫(kù)=="+integer);
});
}
executorService.shutdown();
}總結(jié)
CompletionService主要是去解決無(wú)效等待的問(wèn)題,如果一個(gè)耗時(shí)較長(zhǎng)的任務(wù)在執(zhí)行,那么可以采用這種方式避免無(wú)效的等待
CompletionService還能讓異步任務(wù)的執(zhí)行結(jié)果有序化,先執(zhí)行完就先進(jìn)入阻塞隊(duì)列。
到此這篇關(guān)于Java中的CompletionService批量異步執(zhí)行詳解的文章就介紹到這了,更多相關(guān)CompletionService批量異步執(zhí)行內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java if(boolean)和if(boolean=true)區(qū)別解析
這篇文章主要介紹了Java if(boolean)和if(boolean=true)區(qū)別解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02
JavaWeb實(shí)現(xiàn)自動(dòng)登錄功能
這篇文章主要為大家詳細(xì)介紹了JavaWeb實(shí)現(xiàn)自動(dòng)登錄功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08
解決報(bào)錯(cuò):java:讀取jar包時(shí)出錯(cuò):error in opening zip 
文章總結(jié):解決Java讀取jar包時(shí)出錯(cuò)的問(wèn)題,通過(guò)下載源碼并刷新項(xiàng)目解決了問(wèn)題,希望對(duì)大家有所幫助2024-11-11
Java動(dòng)態(tài)批量生成logback日志文件的示例
本文主要介紹了Java動(dòng)態(tài)批量生成logback日志文件的示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2025-04-04
mybatis?查詢返回Map<String,Object>類型
本文主要介紹了mybatis?查詢返回Map<String,Object>類型,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03
Java實(shí)現(xiàn)后端跨域的常見(jiàn)解決方案
跨源資源共享(CORS——Cross-Origin Resource Sharing,跨源資源共享,或通俗地譯為跨域資源共享)是一種基于 HTTP 頭的機(jī)制,跨域的解決方案有很多種,前后端都有,本文給大家主要介紹Java實(shí)現(xiàn)后端跨域的常見(jiàn)解決方案,需要的朋友可以參考下2024-04-04
Mybatis-plus通用查詢方法封裝的實(shí)現(xiàn)
本文主要介紹了Mybatis-plus通用查詢方法封裝的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07

