Java使用多線程處理未知任務(wù)數(shù)的方案介紹
知道任務(wù)個(gè)數(shù),你可以定義好線程數(shù)規(guī)則,生成線程數(shù)去跑
代碼說(shuō)明:
1.虛擬線程池:
使用 Executors.newVirtualThreadPerTaskExecutor() 創(chuàng)建虛擬線程池,每個(gè)任務(wù)將分配一個(gè)虛擬線程來(lái)執(zhí)行。
2.提交任務(wù)并返回結(jié)果:
- 每個(gè)任務(wù)通過(guò) CompletableFuture.supplyAsync() 提交,任務(wù)會(huì)返回一個(gè)結(jié)果(這里是字符串,模擬了任務(wù)的處理結(jié)果)。
- 每個(gè) CompletableFuture 都會(huì)保存任務(wù)的返回值。
3.等待所有任務(wù)完成:
使用 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) 等待所有的 CompletableFuture 完成。allOf.join() 會(huì)阻塞當(dāng)前線程,直到所有任務(wù)完成。
4.收集結(jié)果:
- 使用 Java 8 的 stream() 方法和 Collectors.toList() 來(lái)收集所有任務(wù)的結(jié)果,并將它們合并到一個(gè) List 中。
- CompletableFuture::join 會(huì)獲取每個(gè)任務(wù)的結(jié)果,并且如果任務(wù)有異常,它會(huì)拋出 CompletionException,因此你可以根據(jù)需要進(jìn)行異常處理。
5.關(guān)閉虛擬線程池:
最后,通過(guò) executorService.shutdown() 關(guān)閉線程池,釋放資源。
public static void main(String[] args) throws InterruptedException { // 創(chuàng)建虛擬線程的線程池 ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); // 假設(shè)我們有10個(gè)任務(wù),每個(gè)任務(wù)返回一個(gè)字符串 int numTasks = 10; List<CompletableFuture<String>> futures = new ArrayList<>(numTasks); // 提交任務(wù)到虛擬線程池 for (int i = 0; i < numTasks; i++) { int taskId = i; // 將每個(gè)任務(wù)的結(jié)果放入 CompletableFuture 中 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { // 模擬工作 System.out.println("Task " + taskId + " started on " + Thread.currentThread()); Thread.sleep(1000); // 模擬延遲 String result = "Result of task " + taskId; System.out.println("Task " + taskId + " completed on " + Thread.currentThread()); return result; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return "Task " + taskId + " was interrupted"; } }, executorService); futures.add(future); // 將每個(gè) future 加入集合 } // 等待所有任務(wù)完成并獲取結(jié)果 CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); allOf.join(); // 阻塞直到所有任務(wù)完成 // 合并所有任務(wù)的結(jié)果到一個(gè)集合中 List<String> results = futures.stream() .map(CompletableFuture::join) // 獲取每個(gè)任務(wù)的結(jié)果 .collect(Collectors.toList()); // 合并到列表 // 打印結(jié)果 System.out.println("All results: " + results); // 關(guān)閉虛擬線程池 executorService.shutdown(); }
Java 不確定線程數(shù),要異步多線程執(zhí)行,還要等待所有線程執(zhí)行結(jié)束,然后獲取結(jié)果合并
解釋?zhuān)?/strong>
任務(wù)列表 (tasks):我們創(chuàng)建了一個(gè) List<Callable> 來(lái)保存所有要執(zhí)行的異步任務(wù),每個(gè)任務(wù)返回一個(gè) Integer 結(jié)果。
創(chuàng)建線程池:使用 Executors.newFixedThreadPool(5) 創(chuàng)建了一個(gè)大小為 5 的線程池,可以并發(fā)執(zhí)行 5 個(gè)線程。線程池的大小可以根據(jù)實(shí)際需要?jiǎng)討B(tài)調(diào)整。
提交任務(wù)并獲取 Future 列表:executorService.invokeAll(tasks) 方法會(huì)提交所有任務(wù),并返回一個(gè) List<Future>。每個(gè) Future 對(duì)象代表一個(gè)異步任務(wù)的結(jié)果。
等待任務(wù)完成并合并結(jié)果:通過(guò) future.get() 方法阻塞當(dāng)前線程,直到任務(wù)完成并返回結(jié)果。我們?cè)?sum 中累加所有任務(wù)的結(jié)果。
關(guān)閉線程池:最后,使用 executorService.shutdown() 關(guān)閉線程池,確保所有線程在任務(wù)完成后能夠被正確回收。
重要事項(xiàng):
- invokeAll():會(huì)阻塞當(dāng)前線程,直到所有任務(wù)完成。如果任務(wù)執(zhí)行的時(shí)間不確定,使用 invokeAll() 是比較合適的,它會(huì)等待所有任務(wù)完成,并返回 Future 列表。
- Future.get():該方法會(huì)阻塞當(dāng)前線程,直到任務(wù)完成。如果任務(wù)執(zhí)行有異常,get() 會(huì)拋出異常。
- 線程池管理:使用 ExecutorService 方便管理線程池的大小,避免頻繁創(chuàng)建和銷(xiāo)毀線程帶來(lái)的性能損失。
public static void main(String[] args) throws InterruptedException, ExecutionException { // 假設(shè)我們有一些任務(wù)需要并發(fā)執(zhí)行 List<Callable<Integer>> tasks = new ArrayList<>(); // 創(chuàng)建一些任務(wù) for (int i = 0; i < 10; i++) { final int taskId = i; tasks.add(() -> { // 模擬任務(wù)執(zhí)行,返回一個(gè)結(jié)果 Thread.sleep(1000); // 模擬任務(wù)耗時(shí) return taskId * 2; // 假設(shè)任務(wù)返回 taskId 的 2 倍 }); } // 創(chuàng)建一個(gè)固定大小的線程池 ExecutorService executorService = Executors.newFixedThreadPool(5); try { // 提交所有任務(wù)并返回一個(gè) Future 列表 List<Future<Integer>> futures = executorService.invokeAll(tasks); // 等待所有任務(wù)完成并合并結(jié)果 int sum = 0; for (Future<Integer> future : futures) { sum += future.get(); // 獲取任務(wù)結(jié)果并合并 } // 輸出所有任務(wù)的合并結(jié)果 System.out.println("Total sum: " + sum); } finally { // 關(guān)閉線程池 executorService.shutdown(); } }
實(shí)際案例 多線程調(diào)API然后合并API的結(jié)果返回給前端
1.聲明任務(wù)隊(duì)列集合
/*變量值對(duì)應(yīng)Map*/ List<VarResultDto> results = new ArrayList<>(); // 假設(shè)我們有一些任務(wù)需要并發(fā)執(zhí)行 List<Callable<Map<String, Object>>> tasks = new ArrayList<>();
2.將任務(wù)加入然后加入任務(wù)隊(duì)列
tasks.add(() -> { Map<String, Object> respTask = new HashMap<>(); List<VarResultDto> listTaskResp = new ArrayList<>(); List<String> listTaskError = new ArrayList<>(); try { log.info("執(zhí)行API請(qǐng)求{} apiId:[{}]", vo.getApiUrl(), vo.getId()); /*請(qǐng)求API獲取結(jié)果*/ R<Object> objectR = apiDataInfoService.executeApi(vo); // 解析結(jié)果 JSONObject apiResp = JSONUtil.parseObj(objectR); if (apiResp.getInt("code") == 200 || apiResp.getInt("code") == 0) { apiResp = apiResp.getJSONObject("data"); } // JavaScript數(shù)據(jù)處理 if (StringUtils.isNotBlank(apiVarInfoDto.getJs())) { try { String newJson = SpringUtils.execJavaScript(JSON.toJSONString(apiResp), apiVarInfoDto.getJs()); apiResp = JSONUtil.parseObj(newJson); log.info("JavaScript數(shù)據(jù)處理完成"); } catch (Exception e) { log.warn("JavaScript數(shù)據(jù)處理異常: {}", JSON.toJSONString(apiVarInfoDto)); } } final JSONObject tempData = apiResp; relations.forEach(relation -> { String value = JSONUtil.getByPath(tempData, relation.getResult(), ""); if (StringUtils.isNotBlank(value)) { // *設(shè)置變量及實(shí)際值* VarResultDto resultDto = new VarResultDto(); resultDto.setId(relation.getId()); resultDto.setName(relation.getName()); resultDto.setResult(value); listTaskResp.add(resultDto); } else { String error = "API接口:[" + vo.getApiName() + "]無(wú)法取得變量:[" + relation.getName() + "]有效數(shù)據(jù),原因:[" + "API地址:" + apiDataInfo.getApiUrl() + "->返回錯(cuò)誤:" + tempData.toString() + "]"; listTaskError.add(error); } }); respTask.put("results", listTaskResp); respTask.put("errorLogs", listTaskError); } catch (Exception e) { log.error("請(qǐng)求API->{}失敗!{}", vo.getApiUrl(), e.getMessage(), e); boolean contains = e.getMessage().contains("TIMEOUT"); /*記錄錯(cuò)誤結(jié)果*/ relations.forEach(relation -> { String error = "API接口:[" + vo.getApiName() + "]無(wú)法取得變量:[" + relation.getName() + "]有效數(shù)據(jù),原因:[" + (contains ? "數(shù)據(jù)接口獲取超時(shí)" : e.getMessage()) + "]"; listTaskError.add(error); }); respTask.put("errorLogs", listTaskError); } return respTask; });
3.提交任務(wù)去執(zhí)行,獲取所有任務(wù)的結(jié)果,合并結(jié)果
String defaultThreadPoolSize = configService.getConfigValue("api_fork_join_size", "5"); // 創(chuàng)建一個(gè)固定大小的線程池 try (ExecutorService executorService = Executors.newFixedThreadPool(Integer.parseInt(defaultThreadPoolSize))) { try { // 提交所有任務(wù)并返回一個(gè) Future 列表 List<Future<Map<String, Object>>> futures = executorService.invokeAll(tasks); // 等待所有任務(wù)完成并合并結(jié)果 List<Map<String, Object>> sum = new ArrayList<>(); for (Future<Map<String, Object>> future : futures) { // 獲取任務(wù)結(jié)果并合并 sum.add(future.get()); } // 輸出所有任務(wù)的合并結(jié)果 for (Map<String, Object> stringObjectMap : sum) { Object results1 = stringObjectMap.get("results"); if (results1 != null) { results.addAll((List<VarResultDto>) results1); } Object errorLogs1 = stringObjectMap.get("errorLogs"); if (errorLogs1 != null) { errorLogs.addAll((List<String>) errorLogs1); } } } catch (Exception e) { log.error("多線程---并行處理--出錯(cuò)了{(lán)}", e.getMessage(), e); } finally { // 關(guān)閉線程池 executorService.shutdown(); } }
到此這篇關(guān)于Java使用多線程處理未知任務(wù)數(shù)的方案介紹的文章就介紹到這了,更多相關(guān)Java多線程處理未知任務(wù)數(shù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java如何實(shí)現(xiàn)海量數(shù)據(jù)判重
在海量數(shù)據(jù)如何確定一個(gè)值是否存在?這是一道非常經(jīng)典的面試場(chǎng)景題,那怎么回答這個(gè)問(wèn)題呢?下面小編就來(lái)和大家詳細(xì)的聊一聊,感興趣的可以一起學(xué)習(xí)一下2023-09-09Spring MVC登錄注冊(cè)以及轉(zhuǎn)換json數(shù)據(jù)
本文主要介紹了Spring MVC登錄注冊(cè)以及轉(zhuǎn)換json數(shù)據(jù)的相關(guān)知識(shí)。具有很好的參考價(jià)值。下面跟著小編一起來(lái)看下吧2017-04-04App登陸java后臺(tái)處理和用戶(hù)權(quán)限驗(yàn)證
這篇文章主要為大家詳細(xì)介紹了App登陸java后臺(tái)處理和用戶(hù)權(quán)限驗(yàn)證,感興趣的朋友可以參考一下2016-06-06Java實(shí)現(xiàn)購(gòu)物管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)購(gòu)物管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-01-01如何將Java與C#時(shí)間進(jìn)行互相轉(zhuǎn)換
這篇文章主要介紹了Java與C#時(shí)間互轉(zhuǎn)的方法以及JAVA日期、C#日期計(jì)算說(shuō)明,需要的朋友可以參考下2022-11-11Maven依賴(lài)管理之parent與dependencyManagement深入分析
首先我們來(lái)說(shuō)說(shuō)parent標(biāo)簽,其實(shí)這個(gè)不難解釋?zhuān)褪歉傅囊馑?,pom也有繼承的。比方說(shuō)我現(xiàn)在有A,B,C,A是B,C的父級(jí)?,F(xiàn)在就是有一個(gè)情況B,C其實(shí)有很多jar都是共同的,其實(shí)是可以放在父項(xiàng)目里面,這樣,讓B,C都繼承A就方便管理了2022-10-10Java中實(shí)現(xiàn)多線程關(guān)鍵詞整理(總結(jié))
這篇文章主要介紹了Java中實(shí)現(xiàn)多線程關(guān)鍵詞整理,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2017-05-05Java優(yōu)秀類(lèi)庫(kù)Hutool使用示例
Hutool是一個(gè)小而全的Java工具類(lèi)庫(kù),通過(guò)靜態(tài)方法封裝,降低相關(guān)API的學(xué)習(xí)成本,提高工作效率,涵蓋了Java開(kāi)發(fā)開(kāi)發(fā)中的方方面面,使用Hutool可節(jié)省開(kāi)發(fā)人員對(duì)項(xiàng)目中公用類(lèi)和公用工具方法的封裝時(shí)間,使開(kāi)發(fā)專(zhuān)注于業(yè)務(wù),同時(shí)可以最大限度的避免封裝不完善帶來(lái)的bug2023-02-02Java?Stream如何將List分組成Map或LinkedHashMap
這篇文章主要給大家介紹了關(guān)于Java?Stream如何將List分組成Map或LinkedHashMap的相關(guān)資料,stream流是Java8的新特性,極大簡(jiǎn)化了集合的處理操作,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-12-12