Java多線程工具CompletableFuture詳解
簡(jiǎn)介
CompletableFuture 是 java 1.8 追加的新特性,通俗的話來說,是一個(gè)函數(shù)式的,用于控制多任務(wù)同步、異步組合操作的工具,實(shí)現(xiàn)諸如:
- 控制若干個(gè)線程任務(wù)間是同步還是異步
- 控制若干個(gè)線程間的先后執(zhí)行順序、依賴關(guān)系
- 若干個(gè)線程任務(wù),任意其中一個(gè)完成就執(zhí)行某種邏輯
- ……
將變得十分簡(jiǎn)單。如果你對(duì)前端有一定了解,你會(huì)發(fā)現(xiàn)它和 Javascript 中的 Promise 是十分類似的。
使用方法
CompletableFuture 需要依仗線程池實(shí)現(xiàn)自身功能,這個(gè)線程池是個(gè)非必填值,如果未特殊指明,將會(huì)使用 ForkJoinPool 的實(shí)例,構(gòu)造方法為 ForkJoinPool.makeCommonPool(),該線程池大小為 Runtime.getRuntime().availableProcessors() - 1 即 當(dāng)前電腦 cpu 可用核心數(shù) -1。
常見 API
方法名稱 | 備注 |
complete | 標(biāo)識(shí)自身已完成任務(wù),并傳入一個(gè)參數(shù)作為 CompletableFuture.get() 將獲取的值;標(biāo)識(shí)結(jié)束是 CAS 方式設(shè)置,只有 未結(jié)束→結(jié)束 的變化才能成功,complete 操作返回 true 時(shí)才真正影響 CompletableFuture.get() 將獲取的值 |
completedFuture | 這是個(gè)靜態(tài)方法,構(gòu)造一個(gè)已完成的 CompletableFuture 對(duì)象,并以傳入的參數(shù)作為 CompletableFuture.get() 將獲取的值 |
get | 阻塞直至任務(wù)完成,并獲取該任務(wù)的返回值 |
join | 阻塞直至任務(wù)完成,并獲取該任務(wù)的返回值(幾乎與 get 等同,但 join 不會(huì)拋出檢查型異常,不強(qiáng)制要求你必須處理) |
cancel | 標(biāo)識(shí)自身已完成,無法阻斷自身任務(wù),但會(huì)構(gòu)造 CancellationException傳給關(guān)聯(lián)在該對(duì)象后續(xù)的 CompletableFuture,后續(xù)的 CompletableFuture 會(huì)因捕獲到異常而終止任務(wù)。另:該函數(shù)入?yún)魅氲?boolean 不會(huì)產(chǎn)生任何作用( javadoc 里這么描述也是絕了) |
completeExceptionally | 可以理解為 cancel 的可自定義異常版本,其入?yún)⒕褪莻鬟f給后續(xù) CompletableFuture 對(duì)象的異常 |
exceptionally | CompletableFuture 鏈路上發(fā)生異常時(shí)會(huì)觸發(fā)該方法,給鏈路的最后一個(gè) CompletableFuture 對(duì)象配置,即可對(duì)全鏈路進(jìn)行異常捕獲,其入?yún)楫惓L幚頃r(shí)需要執(zhí)行的 Function |
isDone | CompletableFuture 任務(wù)是否已完成 |
剩下的大多 API 是 run、accept、apply、then、either、both、async …… 的組合,本質(zhì)上都是語(yǔ)法糖,用了原生的 @FunctionalInterface,決定傳入的函數(shù)有無返回值、有無入?yún)?、在?dāng)前任務(wù)結(jié)束后開始執(zhí)行、是否任意一個(gè)完成就結(jié)束、是否全部完成才結(jié)束、是否另起線程執(zhí)行任務(wù) ……
// 你的下一步業(yè)務(wù)邏輯 Runnable next = new MyRunnable(); // 因?yàn)槭?run 所以無返回值, 因?yàn)槭?then 所以在 completableFuture1 對(duì)應(yīng)的任務(wù)結(jié)束后,執(zhí)行一段任務(wù) completableFuture1.thenRun(next); // 因?yàn)槭?run 所以無返回值, 因?yàn)槭?then 所以在 completableFuture1 對(duì)應(yīng)的任務(wù)結(jié)束后,因?yàn)?async ,所以要另起一個(gè)線程,執(zhí)行一段任務(wù) completableFuture1.thenRunAsync(next);
在看得懂函數(shù)式編程的情況下,其他你可通過源碼函數(shù)定義以此類推
如果想使用自定義的線程池執(zhí)行任務(wù),那么使用帶 Executor executor 的重載函數(shù)即可,后不再重復(fù)說明,例如:
// 自定義線程池 ThreadPoolExecutor executorService = new ThreadPoolExecutor(10, 20, 2L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(10), // 你可以根據(jù)你的需要自己實(shí)現(xiàn) Handler,此處為簡(jiǎn)寫使用現(xiàn)成的 Handler,此處捕獲的是流量過載異常 new ThreadPoolExecutor.AbortPolicy()); // 通過自定義線程池使用 CompletableFuture CompletableFuture.runAsync(() -> { System.out.println("我的業(yè)務(wù)代碼"); }, executorService);
使用示例
為簡(jiǎn)潔代碼,睡眠模擬任務(wù)運(yùn)行耗時(shí)均使用下列函數(shù),后續(xù)不再重復(fù)說明
public static void sleep(Long sleepTime) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } }
請(qǐng)時(shí)刻注意要進(jìn)行異常處理,意味著你的 completableFuture 鏈路得保證調(diào)用過下列代碼,進(jìn)行異常處理(鏈路最后一個(gè)使用,則全鏈路可捕獲異常),后續(xù)為簡(jiǎn)潔代碼已省略
completableFuture.exceptionally((Throwable ex) -> { ex.printStackTrace(); // 取決于 completableFuture 有無返回值,類型是啥。此處實(shí)例是 completableFuture 為 CompletableFuture<Long> return -1L; });
將常規(guī)線程任務(wù)轉(zhuǎn)化為 CompletableFuture 對(duì)象
CompletableFuture completableFuture = new CompletableFuture(); Long startTs = System.currentTimeMillis(); new Thread(() -> { sleep(200L); completableFuture.complete("完成");// 設(shè)置 completableFuture 結(jié)果并將狀態(tài)設(shè)置為已完成 }).start(); while (!completableFuture.isDone()) { // 非阻塞式獲取結(jié)果,如果當(dāng)前未執(zhí)行完成則返回入?yún)⒆址?我還沒完成" // 執(zhí)行到此處恰好任務(wù)完成,然后在執(zhí)行 while 循環(huán)判斷跳出,你在循環(huán)內(nèi)最多輸出一次 “完成” System.out.println(completableFuture.getNow("未完")); } System.out.println("最終結(jié)果:" + completableFuture.getNow("未完") + " " + (System.currentTimeMillis() - startTs) + " ms");
阻塞到任意一個(gè)任務(wù)完成
public static void 阻塞到任意一個(gè)完成() throws IOException { // 模擬一個(gè)耗時(shí) 20 L 的任務(wù) CompletableFuture<Long> completableFuture1 = CompletableFuture.supplyAsync(() -> { sleep(20L); System.out.println("completableFuture1 完成" + " --" + (System.currentTimeMillis() - startTs)); return 20L; }); // 模擬一個(gè)耗時(shí) 10 L 的任務(wù) CompletableFuture<Long> completableFuture2 = CompletableFuture.supplyAsync(() -> { sleep(10L); System.out.println("completableFuture2 完成" + " --" + (System.currentTimeMillis() - startTs)); return 10L; }); // applyToEitherAsync 代表另起一個(gè)線程去執(zhí)行第二個(gè)入?yún)⒌拇a塊,這里其實(shí)沒啥影響,我就不加 Async 了 CompletableFuture result = completableFuture1.applyToEither(completableFuture2, fasterOne -> { System.out.println(fasterOne); return fasterOne; }); // 除了寫回調(diào)函數(shù)方法外的另一種獲取最快值的方法 System.out.println("最快的為:" + result.join()); // 調(diào)用讀取行阻塞住,防止異步任務(wù)還未完成就退出了 System.in.read(); }
遇到特別多任務(wù)的情況下,你可以嘗試數(shù)組
CompletableFuture[] array = new CompletableFuture[2]; array[0] = completableFuture1; array[1] = completableFuture2; CompletableFuture fasterOne = CompletableFuture.anyOf(array); System.out.println("最快的為:" + fasterOne.join());
阻塞到全部任務(wù)完成
public static void 阻塞到全部完成() { Long startTs = System.currentTimeMillis(); // 模擬一個(gè)耗時(shí) 20 L 的任務(wù) CompletableFuture<Long> completableFuture1 = CompletableFuture.supplyAsync(() -> { sleep(20L); System.out.println("completableFuture1 完成" + " --" + (System.currentTimeMillis() - startTs)); return 20L; }); // 模擬一個(gè)耗時(shí) 10 L 的任務(wù) CompletableFuture<Long> completableFuture2 = CompletableFuture.supplyAsync(() -> { sleep(10L); System.out.println("completableFuture2 完成" + " --" + (System.currentTimeMillis() - startTs)); return 10L; }); // thenAcceptBothAsync 代表另起一個(gè)線程去執(zhí)行第二個(gè)入?yún)⒌拇a塊,這里其實(shí)沒啥影響,我就不加 Async 了 CompletableFuture result = completableFuture1.thenAcceptBoth(completableFuture2, (r1, r2) -> { System.out.println("completableFuture1 :" + r1); System.out.println("completableFuture2 :" + r2); }); // thenAcceptBoth 是沒有返回值的,所以這里是 null ,但這句代碼還是有作用的,相當(dāng)于阻塞到全部任務(wù)都完成 System.out.println("返回這個(gè) null 之后意味著全部任務(wù)已完成:" + result.join()); }
遇到特別多任務(wù)的情況下,你可以嘗試數(shù)組
CompletableFuture[] array = new CompletableFuture[2]; array[0] = completableFuture1; array[1] = completableFuture2; CompletableFuture<Void> all = CompletableFuture.allOf(array); System.out.println("返回這個(gè) null 之后意味著全部任務(wù)已完成:" + all.join());
合并任務(wù)
合并任務(wù)會(huì)涉及到 Compose、Combine,他們區(qū)別在于合并的邏輯不同:
- Compose: 合并的兩個(gè)任務(wù)間是同步阻塞執(zhí)行的,后一個(gè)任務(wù)需要阻塞等待第一個(gè)任務(wù)執(zhí)行完成。你需要傳入一個(gè)函數(shù) —— 已知第一個(gè)任務(wù)的返回值,返回合并之后的 CompletableFuture 對(duì)象
- Combine: 合并的兩個(gè)任務(wù)間是異步執(zhí)行的。你需要傳入另一個(gè)任務(wù)、一個(gè)函數(shù) —— 已知兩個(gè)任務(wù)的返回值,合并成最終返回值
public static void 合并() { Long startTs = System.currentTimeMillis(); // 模擬一個(gè)耗時(shí) 100 L 的任務(wù) CompletableFuture<Long> completableFuture1 = CompletableFuture.supplyAsync(() -> { sleep(100L); System.out.println("completableFuture1 完成" + " --" + (System.currentTimeMillis() - startTs)); return 100L; }); // 模擬一個(gè)耗時(shí) 100 L 的任務(wù) CompletableFuture<Long> completableFuture2 = CompletableFuture.supplyAsync(() -> { sleep(100L); System.out.println("completableFuture2 完成" + " --" + (System.currentTimeMillis() - startTs)); return 100L; }); // 模擬 completableFuture1 合并一個(gè)耗時(shí) 120 L 的任務(wù),返回值為兩個(gè)任務(wù)總工時(shí) // thenCombineAsync 代表另起一個(gè)線程去執(zhí)行第二個(gè)入?yún)⒌拇a塊,這里其實(shí)沒啥影響,我就不加 Async 了 CompletableFuture<Long> completableFuture3 = completableFuture1.thenCombine(CompletableFuture.supplyAsync(() -> { sleep(120L); return 120L; }), (x, y) -> x + y); // 模擬 completableFuture2 合并一個(gè)耗時(shí) 50 L 的任務(wù),返回值為兩個(gè)任務(wù)總工時(shí) // thenComposeAsync 代表另起一個(gè)線程去執(zhí)行第一個(gè)入?yún)⒌拇a塊,這里其實(shí)沒啥影響,我就不加 Async 了 CompletableFuture<Long> completableFuture4 = completableFuture2.thenCompose(r2 -> { CompletableFuture<Long> temp = CompletableFuture.completedFuture(r2); return temp.thenApply(rTemp -> { sleep(50L); return temp.join() + 50L; }); }); boolean printFlag3 = true; boolean printFlag4 = true; String completableFuture3Info = null; String completableFuture4Info = null; while (!completableFuture3.isDone() || !completableFuture4.isDone()) { if (completableFuture3.isDone()) { if (printFlag3) { printFlag3 = false; completableFuture3Info = "completableFuture3 完成:" + completableFuture3.join() + " --" + (System.currentTimeMillis() - startTs); } } if (completableFuture4.isDone()) { if (printFlag4) { printFlag4 = false; completableFuture4Info = "completableFuture4 完成:" + completableFuture4.join() + " --" + (System.currentTimeMillis() - startTs); } } } System.out.println(completableFuture3Info != null ? completableFuture3Info : "completableFuture3 actual:" + completableFuture3.getNow(-100L) + " --" + (System.currentTimeMillis() - startTs)); System.out.println(completableFuture4Info != null ? completableFuture4Info : "completableFuture4 actual:" + completableFuture4.getNow(-100L) + " --" + (System.currentTimeMillis() - startTs)); }
你會(huì)觀測(cè)到總工時(shí)更長(zhǎng)的反而實(shí)際結(jié)束時(shí)間點(diǎn)更早,completableFuture3 早于 completableFuture4
到此這篇關(guān)于Java多線程工具CompletableFuture詳解的文章就介紹到這了,更多相關(guān)多線程工具CompletableFuture內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Feign如何解決服務(wù)之間調(diào)用傳遞token
這篇文章主要介紹了Feign如何解決服務(wù)之間調(diào)用傳遞token,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03JPA @GeneratedValue 四種標(biāo)準(zhǔn)用法TABLE,SEQUENCE,IDENTITY,
這篇文章主要介紹了@GeneratedValue 四種標(biāo)準(zhǔn)用法TABLE,SEQUENCE,IDENTITY,AUTO詳解,需要的朋友可以參考下2024-03-03關(guān)于Java異常處理的幾條建議_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
Java提供了拋出異常、捕捉異常和finally語(yǔ)句的使用來處理程序異常,下面就來具體看一下關(guān)于Java異常處理的幾條建議2017-06-06基于Java在netty中實(shí)現(xiàn)線程和CPU綁定
這篇文章主要介紹了基于Java在netty中實(shí)現(xiàn)線程和CPU綁定,文章圍繞主題的相關(guān)內(nèi)容展開詳細(xì)介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-05-05利用Java的Struts框架實(shí)現(xiàn)電子郵件發(fā)送功能
這篇文章主要介紹了利用Java的Struts框架實(shí)現(xiàn)電子郵件發(fā)送功能,Struts框架是Java的SSH三大web開發(fā)框架之一,需要的朋友可以參考下2015-12-12Java設(shè)計(jì)模式之模板方法模式Template Method Pattern詳解
在我們實(shí)際開發(fā)中,如果一個(gè)方法極其復(fù)雜時(shí),如果我們將所有的邏輯寫在一個(gè)方法中,那維護(hù)起來就很困難,要替換某些步驟時(shí)都要重新寫,這樣代碼的擴(kuò)展性就很差,當(dāng)遇到這種情況就要考慮今天的主角——模板方法模式2022-11-11Spring Boot實(shí)現(xiàn)郵件發(fā)送必會(huì)的5種姿勢(shì)
這篇文章主要給大家介紹了關(guān)于Spring Boot實(shí)現(xiàn)郵件發(fā)送必會(huì)的5種姿勢(shì),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Spring Boot具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-07-07MyBatis實(shí)現(xiàn)兩種查詢樹形數(shù)據(jù)的方法詳解(嵌套結(jié)果集和遞歸查詢)
樹形結(jié)構(gòu)數(shù)據(jù)在開發(fā)中十分常見,比如:菜單數(shù)、組織樹, 利用 MyBatis 提供嵌套查詢功能可以很方便地實(shí)現(xiàn)這個(gè)功能需求。本文主要介紹了兩種方法,感興趣的可以了解一下2021-09-09