Java使用多線程處理未知任務(wù)數(shù)的方案介紹
知道任務(wù)個數(shù),你可以定義好線程數(shù)規(guī)則,生成線程數(shù)去跑
代碼說明:
1.虛擬線程池:
使用 Executors.newVirtualThreadPerTaskExecutor() 創(chuàng)建虛擬線程池,每個任務(wù)將分配一個虛擬線程來執(zhí)行。
2.提交任務(wù)并返回結(jié)果:
- 每個任務(wù)通過 CompletableFuture.supplyAsync() 提交,任務(wù)會返回一個結(jié)果(這里是字符串,模擬了任務(wù)的處理結(jié)果)。
- 每個 CompletableFuture 都會保存任務(wù)的返回值。
3.等待所有任務(wù)完成:
使用 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) 等待所有的 CompletableFuture 完成。allOf.join() 會阻塞當(dāng)前線程,直到所有任務(wù)完成。
4.收集結(jié)果:
- 使用 Java 8 的 stream() 方法和 Collectors.toList() 來收集所有任務(wù)的結(jié)果,并將它們合并到一個 List 中。
- CompletableFuture::join 會獲取每個任務(wù)的結(jié)果,并且如果任務(wù)有異常,它會拋出 CompletionException,因此你可以根據(jù)需要進行異常處理。
5.關(guān)閉虛擬線程池:
最后,通過 executorService.shutdown() 關(guān)閉線程池,釋放資源。
public static void main(String[] args) throws InterruptedException {
// 創(chuàng)建虛擬線程的線程池
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
// 假設(shè)我們有10個任務(wù),每個任務(wù)返回一個字符串
int numTasks = 10;
List<CompletableFuture<String>> futures = new ArrayList<>(numTasks);
// 提交任務(wù)到虛擬線程池
for (int i = 0; i < numTasks; i++) {
int taskId = i;
// 將每個任務(wù)的結(jié)果放入 CompletableFuture 中
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
// 模擬工作
System.out.println("Task " + taskId + " started on " + Thread.currentThread());
Thread.sleep(1000); // 模擬延遲
String result = "Result of task " + taskId;
System.out.println("Task " + taskId + " completed on " + Thread.currentThread());
return result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Task " + taskId + " was interrupted";
}
}, executorService);
futures.add(future); // 將每個 future 加入集合
}
// 等待所有任務(wù)完成并獲取結(jié)果
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allOf.join(); // 阻塞直到所有任務(wù)完成
// 合并所有任務(wù)的結(jié)果到一個集合中
List<String> results = futures.stream()
.map(CompletableFuture::join) // 獲取每個任務(wù)的結(jié)果
.collect(Collectors.toList()); // 合并到列表
// 打印結(jié)果
System.out.println("All results: " + results);
// 關(guān)閉虛擬線程池
executorService.shutdown();
}
Java 不確定線程數(shù),要異步多線程執(zhí)行,還要等待所有線程執(zhí)行結(jié)束,然后獲取結(jié)果合并
解釋:
任務(wù)列表 (tasks):我們創(chuàng)建了一個 List<Callable> 來保存所有要執(zhí)行的異步任務(wù),每個任務(wù)返回一個 Integer 結(jié)果。
創(chuàng)建線程池:使用 Executors.newFixedThreadPool(5) 創(chuàng)建了一個大小為 5 的線程池,可以并發(fā)執(zhí)行 5 個線程。線程池的大小可以根據(jù)實際需要動態(tài)調(diào)整。
提交任務(wù)并獲取 Future 列表:executorService.invokeAll(tasks) 方法會提交所有任務(wù),并返回一個 List<Future>。每個 Future 對象代表一個異步任務(wù)的結(jié)果。
等待任務(wù)完成并合并結(jié)果:通過 future.get() 方法阻塞當(dāng)前線程,直到任務(wù)完成并返回結(jié)果。我們在 sum 中累加所有任務(wù)的結(jié)果。
關(guān)閉線程池:最后,使用 executorService.shutdown() 關(guān)閉線程池,確保所有線程在任務(wù)完成后能夠被正確回收。
重要事項:
- invokeAll():會阻塞當(dāng)前線程,直到所有任務(wù)完成。如果任務(wù)執(zhí)行的時間不確定,使用 invokeAll() 是比較合適的,它會等待所有任務(wù)完成,并返回 Future 列表。
- Future.get():該方法會阻塞當(dāng)前線程,直到任務(wù)完成。如果任務(wù)執(zhí)行有異常,get() 會拋出異常。
- 線程池管理:使用 ExecutorService 方便管理線程池的大小,避免頻繁創(chuàng)建和銷毀線程帶來的性能損失。
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 假設(shè)我們有一些任務(wù)需要并發(fā)執(zhí)行
List<Callable<Integer>> tasks = new ArrayList<>();
// 創(chuàng)建一些任務(wù)
for (int i = 0; i < 10; i++) {
final int taskId = i;
tasks.add(() -> {
// 模擬任務(wù)執(zhí)行,返回一個結(jié)果
Thread.sleep(1000); // 模擬任務(wù)耗時
return taskId * 2; // 假設(shè)任務(wù)返回 taskId 的 2 倍
});
}
// 創(chuàng)建一個固定大小的線程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
try {
// 提交所有任務(wù)并返回一個 Future 列表
List<Future<Integer>> futures = executorService.invokeAll(tasks);
// 等待所有任務(wù)完成并合并結(jié)果
int sum = 0;
for (Future<Integer> future : futures) {
sum += future.get(); // 獲取任務(wù)結(jié)果并合并
}
// 輸出所有任務(wù)的合并結(jié)果
System.out.println("Total sum: " + sum);
} finally {
// 關(guān)閉線程池
executorService.shutdown();
}
}
實際案例 多線程調(diào)API然后合并API的結(jié)果返回給前端
1.聲明任務(wù)隊列集合
/*變量值對應(yīng)Map*/
List<VarResultDto> results = new ArrayList<>();
// 假設(shè)我們有一些任務(wù)需要并發(fā)執(zhí)行
List<Callable<Map<String, Object>>> tasks = new ArrayList<>();
2.將任務(wù)加入然后加入任務(wù)隊列
tasks.add(() -> {
Map<String, Object> respTask = new HashMap<>();
List<VarResultDto> listTaskResp = new ArrayList<>();
List<String> listTaskError = new ArrayList<>();
try {
log.info("執(zhí)行API請求{} apiId:[{}]", vo.getApiUrl(), vo.getId());
/*請求API獲取結(jié)果*/
R<Object> objectR = apiDataInfoService.executeApi(vo);
// 解析結(jié)果
JSONObject apiResp = JSONUtil.parseObj(objectR);
if (apiResp.getInt("code") == 200 || apiResp.getInt("code") == 0) {
apiResp = apiResp.getJSONObject("data");
}
// JavaScript數(shù)據(jù)處理
if (StringUtils.isNotBlank(apiVarInfoDto.getJs())) {
try {
String newJson = SpringUtils.execJavaScript(JSON.toJSONString(apiResp), apiVarInfoDto.getJs());
apiResp = JSONUtil.parseObj(newJson);
log.info("JavaScript數(shù)據(jù)處理完成");
} catch (Exception e) {
log.warn("JavaScript數(shù)據(jù)處理異常: {}", JSON.toJSONString(apiVarInfoDto));
}
}
final JSONObject tempData = apiResp;
relations.forEach(relation -> {
String value = JSONUtil.getByPath(tempData, relation.getResult(), "");
if (StringUtils.isNotBlank(value)) {
// *設(shè)置變量及實際值*
VarResultDto resultDto = new VarResultDto();
resultDto.setId(relation.getId());
resultDto.setName(relation.getName());
resultDto.setResult(value);
listTaskResp.add(resultDto);
} else {
String error = "API接口:[" + vo.getApiName() + "]無法取得變量:[" + relation.getName() + "]有效數(shù)據(jù),原因:[" + "API地址:" + apiDataInfo.getApiUrl() + "->返回錯誤:" + tempData.toString() + "]";
listTaskError.add(error);
}
});
respTask.put("results", listTaskResp);
respTask.put("errorLogs", listTaskError);
} catch (Exception e) {
log.error("請求API->{}失??!{}", vo.getApiUrl(), e.getMessage(), e);
boolean contains = e.getMessage().contains("TIMEOUT");
/*記錄錯誤結(jié)果*/
relations.forEach(relation -> {
String error = "API接口:[" + vo.getApiName() + "]無法取得變量:[" + relation.getName() + "]有效數(shù)據(jù),原因:[" + (contains ? "數(shù)據(jù)接口獲取超時" : e.getMessage()) + "]";
listTaskError.add(error);
});
respTask.put("errorLogs", listTaskError);
}
return respTask;
});
3.提交任務(wù)去執(zhí)行,獲取所有任務(wù)的結(jié)果,合并結(jié)果
String defaultThreadPoolSize = configService.getConfigValue("api_fork_join_size", "5");
// 創(chuàng)建一個固定大小的線程池
try (ExecutorService executorService = Executors.newFixedThreadPool(Integer.parseInt(defaultThreadPoolSize))) {
try {
// 提交所有任務(wù)并返回一個 Future 列表
List<Future<Map<String, Object>>> futures = executorService.invokeAll(tasks);
// 等待所有任務(wù)完成并合并結(jié)果
List<Map<String, Object>> sum = new ArrayList<>();
for (Future<Map<String, Object>> future : futures) {
// 獲取任務(wù)結(jié)果并合并
sum.add(future.get());
}
// 輸出所有任務(wù)的合并結(jié)果
for (Map<String, Object> stringObjectMap : sum) {
Object results1 = stringObjectMap.get("results");
if (results1 != null) {
results.addAll((List<VarResultDto>) results1);
}
Object errorLogs1 = stringObjectMap.get("errorLogs");
if (errorLogs1 != null) {
errorLogs.addAll((List<String>) errorLogs1);
}
}
} catch (Exception e) {
log.error("多線程---并行處理--出錯了{(lán)}", e.getMessage(), e);
} finally {
// 關(guān)閉線程池
executorService.shutdown();
}
}到此這篇關(guān)于Java使用多線程處理未知任務(wù)數(shù)的方案介紹的文章就介紹到這了,更多相關(guān)Java多線程處理未知任務(wù)數(shù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring MVC登錄注冊以及轉(zhuǎn)換json數(shù)據(jù)
本文主要介紹了Spring MVC登錄注冊以及轉(zhuǎn)換json數(shù)據(jù)的相關(guān)知識。具有很好的參考價值。下面跟著小編一起來看下吧2017-04-04
Maven依賴管理之parent與dependencyManagement深入分析
首先我們來說說parent標(biāo)簽,其實這個不難解釋,就是父的意思,pom也有繼承的。比方說我現(xiàn)在有A,B,C,A是B,C的父級?,F(xiàn)在就是有一個情況B,C其實有很多jar都是共同的,其實是可以放在父項目里面,這樣,讓B,C都繼承A就方便管理了2022-10-10
Java中實現(xiàn)多線程關(guān)鍵詞整理(總結(jié))
這篇文章主要介紹了Java中實現(xiàn)多線程關(guān)鍵詞整理,非常不錯,具有參考借鑒價值,需要的朋友可以參考下2017-05-05
Java?Stream如何將List分組成Map或LinkedHashMap
這篇文章主要給大家介紹了關(guān)于Java?Stream如何將List分組成Map或LinkedHashMap的相關(guān)資料,stream流是Java8的新特性,極大簡化了集合的處理操作,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-12-12

