Java中的CompletableFuture異步編程詳解
場景引入
只要提到多線程來優(yōu)化性能,那么必定離不開異步化,異步化的出現(xiàn)才是多線程優(yōu)化性能這個(gè)核心方案的基礎(chǔ)。 異步化其實(shí)我們早已接觸,如下Thread類,主線程不需要等待線程T1,T2的執(zhí)行結(jié)果,就能實(shí)現(xiàn)異步邏輯。
public static void main(String[] args) { Thread T1 = new Thread(()->{ // 執(zhí)行方法A邏輯 }); Thread T2 = new Thread(()->{ // 執(zhí)行方法B邏輯 }); // 省略其它邏輯 }
這種方案雖然可行,但是在開發(fā)中明顯不是最優(yōu),現(xiàn)實(shí)生產(chǎn)業(yè)務(wù)對應(yīng)各種各樣的需求,簡單的Thread已經(jīng)不滿足需求,所以Java在1.8版本提出CompletableFuture工具類來解決生產(chǎn)中遇到的異步化問題。
CompletableFuture初體驗(yàn)
先從之前提到的華羅庚提出的最優(yōu)泡茶問題入手,簡易體驗(yàn)下CompletableFuture工具類的優(yōu)勢。 最優(yōu)泡茶問題可以分為如下步驟
在用FutureTask實(shí)現(xiàn)時(shí)是將上述步驟拆分為兩個(gè)線程執(zhí)行,如下所示
通過代碼實(shí)現(xiàn)明顯能感覺到,線程需要手動(dòng)維護(hù),代碼邏輯復(fù)雜,不能專注于業(yè)務(wù)代碼。
為了方便采用CompletableFuture實(shí)現(xiàn),將最優(yōu)泡茶方案進(jìn)一步細(xì)分,如下所示,將流程拆分三個(gè)線程執(zhí)行,線程F3需要等待線程F1,F2都返回后才執(zhí)行。
代碼實(shí)現(xiàn)如下
public static void main(String[] args) throws Exception { // 無返回值的異步調(diào)用 CompletableFuture f1 = CompletableFuture.runAsync(()->{ System.out.println("F1 洗水壺"); sleep(1); System.out.println("F1 燒水"); sleep(15); }); // 有返回值的實(shí)例化 CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { System.out.println("F2 洗茶壺"); sleep(1); System.out.println("F2 洗茶杯"); sleep(2); System.out.println("F2 拿茶葉"); sleep(1); return "龍井"; }); // f3等待f1和f2到達(dá)后才能執(zhí)行 // param1是f1的返回值,這里沒有就是空 param2是f2的返回值 CompletableFuture f3 = f1.thenCombine(f2, (param1, param2) -> { System.out.println("F3 拿到茶葉:" + param2); System.out.println("F3 泡茶"); return param2; }); // 阻塞等待 f3.get(); } public static void sleep(int t){ try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } }
返回結(jié)果
從實(shí)現(xiàn)代碼來看CompletableFuture有如下優(yōu)勢
- 不需要手動(dòng)維護(hù)線程,不需要手動(dòng)給任務(wù)分配工作線程。
- 代碼簡練可以專注業(yè)務(wù)邏輯。
創(chuàng)建CompletableFuture對象
創(chuàng)建CompletableFuture除了初體驗(yàn)CompletableFuture代碼中的兩種還有兩種,總共四種方法簽名如下
// 有返回值 supplier供給型接口(不進(jìn)有出) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier); // 有返回值,任務(wù)使用自定義的線程池 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor); // 無返回值 Runnable接口執(zhí)行run方法無返回值 public static CompletableFuture<Void> runAsync(Runnable runnable); // 無返回值,任務(wù)使用自定義的線程池 public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
注意:在創(chuàng)建CompletableFuture對象時(shí)盡量使用自定義的線程池,因?yàn)槟J(rèn)是采用公共的ForkJoinPool線程池,如果某個(gè)CompletableFuture中有I/O操作非常耗時(shí)的就會(huì)阻塞該線程池中所有的線程,導(dǎo)致線程饑餓的風(fēng)險(xiǎn),進(jìn)而影響整個(gè)系統(tǒng)的性能,生產(chǎn)中最好按照業(yè)務(wù)創(chuàng)建不同類型的線程池,互不干擾。
CompletableFuture類定義
工具類CompletableFuture類定義如下
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
由于CompletableFuture類實(shí)現(xiàn)了Future接口,所以異步線程的兩大問題,線程什么時(shí)候執(zhí)行完畢?線程的返回值是什么?都可以利用Future接口的特性解決,
另外CompletableFuture也實(shí)現(xiàn)了CompletionStage接口,那CompletionStage又是什么呢?
CompletionStage接口
CompletionStage接口是去描述任務(wù)之間的時(shí)序關(guān)系,包括前面提到的f1.thenCombine(f2, (param1, param2) -> {})就是一種典型的AND聚合關(guān)系,還能描述OR聚合關(guān)系,串行關(guān)系,以及異步編程中的異常處理關(guān)系。
AND匯聚關(guān)系
AND匯聚關(guān)系即表示依賴任務(wù)全部執(zhí)行完畢才能執(zhí)行當(dāng)前任務(wù),在CompletableFuture初體驗(yàn)中有使用,可以參考。
CompletionStage接口描述匯聚關(guān)系的方法簽名如下:
public CompletionStage<V> thenCombine(CompletionStage other,BiFunction fn); public CompletionStage<V> thenCombineAsync(CompletionStage other,BiFunction fn); public CompletionStage<V> thenCombineAsync(CompletionStage other,BiFunction fn,Executor executor); public CompletionStage<Void> thenAcceptBoth(CompletionStage other,BiConsumer consumer); public CompletionStage<Void> thenAcceptBothAsync(CompletionStage other,BiConsumer consumer); public CompletionStage<Void> thenAcceptBothAsync(CompletionStage other,BiConsumer consumer,Executor executor); public CompletionStage<Void> runAfterBoth(CompletionStage other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage other,Runnable action,Executor executor);
thenCombine、thenAcceptBoth、runAfterBoth這三個(gè)系列方法的區(qū)別源自核心參數(shù)區(qū)別
- BiFunction fn參數(shù)是函數(shù)式接口,這個(gè)參數(shù)既能支持入?yún)⒁仓С址祷刂怠?/li>
- BiConsumer consumer參數(shù)是消費(fèi)型接口,只有入?yún)]有返回值。
- Runnable action 參數(shù)不支持入?yún)⒁膊恢С殖鰠ⅰ?/li>
- Executor executor 表示指定線程池,不使用公共的ForkJoin線程池。
- 其中Async表示異步執(zhí)行fn、consumer、action邏輯。
OR聚合關(guān)系
OR聚合關(guān)系表示當(dāng)其中一個(gè)依賴任務(wù)執(zhí)行完畢就可以執(zhí)行當(dāng)前任務(wù)。
CompletionStage接口描述聚合關(guān)系的方法簽名如下:
public CompletionStage<U> applyToEither(CompletionStage other,Function fn); public CompletionStage<U> applyToEitherAsync(CompletionStage other,Function fn); public CompletionStage<U> applyToEitherAsync(CompletionStage other,Function fn,Executor executor); public CompletionStage<Void> acceptEither(CompletionStage other,Consumer action); public CompletionStage<Void> acceptEitherAsync(CompletionStage other,Consumer action); public CompletionStage<Void> acceptEitherAsync(CompletionStage other,Consumer action,Executor executor); public CompletionStage<Void> runAfterEither(CompletionStage other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage other,Runnable action,Executor executor)
這三個(gè)系列方法的區(qū)別也是源自核心參數(shù)區(qū)別。
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{ double num = Math.random(); System.out.println("f1 返回值:"+num); // TimeUnit.SECONDS.sleep(1); return num; }); CompletableFuture<Double> f2 = CompletableFuture.supplyAsync(()->{ double num = Math.random(); System.out.println("f2 返回值:"+num); return num; }); CompletableFuture<Void> f3 = f1.acceptEither(f2, (num) -> { System.out.println("最后返回值:" + num); }); // 不是需要返回值,而是阻塞等待f3執(zhí)行結(jié)束 // 返回結(jié)果就是任務(wù)f1,f2中的任意一個(gè)返回值,需要保證f1,f2的返回值類型相同 f3.get(); }
串行關(guān)系
串行關(guān)系表示依賴任務(wù)按照編寫順序先后執(zhí)行。
CompletionStage接口描述串行關(guān)系的方法簽名如下:
public <U> CompletionStage<U> thenApply(Function fn); public <U> CompletionStage<U> thenApplyAsync(Function fn); public <U> CompletionStage<U> thenApplyAsync(Function fn,Executor executor); public CompletionStage<Void> thenAccept(Consume action); public CompletionStage<Void> thenAcceptAsync(Consumer action); public CompletionStage<Void> thenAcceptAsync(Consumer action,Executor executor); public CompletionStage<Void> thenRun(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor); public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor);
thenApply、thenAccept、thenRun這三個(gè)系列方法的區(qū)別也是源自核心參數(shù)區(qū)別。
需要注意的是thenCompose系列方法,這個(gè)方法會(huì)新創(chuàng)建出一個(gè)子流程,最終結(jié)果和thenApply系列方法相同。
如下所示,從上往下依次執(zhí)行。
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture completableFuture = CompletableFuture.supplyAsync(()->{ return "hello world"; }).thenApply((param1)->{ return param1 + " CompletableFuture"; }).thenAccept((param2)->{ param2 = param2.toUpperCase(); System.out.println(param2); }); completableFuture.get(); }
異常關(guān)系
在串行關(guān)系、OR聚合關(guān)系、AND匯聚關(guān)系中,由于其核心方法參數(shù)fn、consumer、action都不允許拋出異常,但是都無法限制它們拋出異常,如下所示
public static void main(String[] args) throws Exception { CompletableFuture<Integer> f0 = CompletableFuture .supplyAsync(()->(7/0)) .thenApply(r->r*10); System.out.println(f0.get()); }
非異步編程可以采用try{}catch{}捕獲那么異步編程如何處理呢?
CompletionStage接口支持異常處理方法簽名如下
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn); public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor); public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
- exceptionally使用類似try{}catch{}中的 catch{}。
- whenComplete系列使用類似try{}finally{}中的 finally{},無論是否發(fā)生異常都會(huì)執(zhí)行whenComplete中的邏輯代碼,但是無返回結(jié)果。
- handle系列使用和whenComplete一樣,不過handle支持返回結(jié)果
測試代碼如下
public static void main(String[] args) throws Exception { CompletableFuture<Integer> f0 = CompletableFuture .supplyAsync(()->(7/0)) .thenApply(r->r*10) .exceptionally((throwable)->{ // 相當(dāng)于catch完后,不拋出異常 System.out.println("異常了:"+throwable); return 1; }).whenComplete((integer,throwable)->{ // integer 返回值 throwable返回異常(如果exceptionally捕獲返回null) System.out.println("返回值:"+integer); System.out.println("異常:"+throwable); }).handle((integer,throwable)->{ // integer 返回值 throwable返回異常(如果exceptionally捕獲返回null) System.out.println("返回值:"+integer); System.out.println("異常:"+throwable); return 2; }); System.out.println(f0.get()); }
到此這篇關(guān)于Java中的CompletableFuture異步編程詳解的文章就介紹到這了,更多相關(guān)CompletableFuture異步編程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
聊聊@Autowired注解注入,寫接口名字還是實(shí)現(xiàn)類的名字
這篇文章主要介紹了聊聊@Autowired注解注入,寫接口名字還是實(shí)現(xiàn)類的名字,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11Java中字符串String的+和+=及循環(huán)操作String原理詳解
Java編譯器在編譯時(shí)對String的+和+=操作會(huì)創(chuàng)建StringBuilder對象來進(jìn)行字符串的拼接,下面這篇文章主要給大家介紹了關(guān)于Java中字符串String的+和+=及循環(huán)操作String原理的相關(guān)資料,需要的朋友可以參考下2023-01-01基于java Springboot實(shí)現(xiàn)教務(wù)管理系統(tǒng)詳解
這篇文章主要介紹了Java 實(shí)現(xiàn)簡易教務(wù)管理系統(tǒng)的代碼,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-08-08spring接口通過配置支持返回多種格式(xml,json,html,excel)
這篇文章主要給大家介紹了關(guān)于spring接口如何通過配置支持返回多種格式(xml,json,html,excel)的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2017-12-12java9版本特性資源自動(dòng)關(guān)閉的語法增強(qiáng)
這篇文章主要為大家介紹了java9版本特性資源自動(dòng)關(guān)閉的語法增強(qiáng)的詳細(xì)使用說明,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-03-03JavaWeb之Ajax的基本使用與實(shí)戰(zhàn)案例
ajax技術(shù)是使頁面能局部刷新的一種技術(shù),下面這篇文章主要給大家介紹了關(guān)于JavaWeb之Ajax的基本使用與實(shí)戰(zhàn)案例的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-08-08Springboot繼承Keycloak實(shí)現(xiàn)單點(diǎn)登錄與退出功能
這篇文章主要介紹了Springboot繼承Keycloak實(shí)現(xiàn)單點(diǎn)登陸與退出,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08java類訪問權(quán)限與成員訪問權(quán)限解析
這篇文章主要針對java類訪問權(quán)限與成員訪問權(quán)限進(jìn)行解析,對類與成員訪問權(quán)限進(jìn)行驗(yàn)證,感興趣的小伙伴們可以參考一下2016-02-02