Java?CompletableFuture實現(xiàn)多線程異步編排
一 :問題背景
問題:當(dāng)查詢接口較復(fù)雜時候,數(shù)據(jù)的獲取都需要[遠程調(diào)用],必然需要花費更多的時間。 假如查詢文章詳情頁面,需要如下標(biāo)注的時間才能完成,比如如下場景:
1. 查詢文章詳情 0.5s
2. 查詢文章博主個人信息 0.5s
3. 查詢文章評論 1s
4. 查詢博主相關(guān)文章分類 1s
5. 相關(guān)推薦文章 1s
上面的描述只是舉個例子不要在意這里的查詢描述,看實際情況使用,有些相關(guān)的查詢我們可以拆分接口實現(xiàn),上面的描述只是為了舉例子。
那么,用戶需要4s后才能統(tǒng)計的數(shù)據(jù)。很顯然是不能接受的。 如果有多個線程同時完成這4步操作,也許只需要1s左右即可完成響應(yīng)。
二 :CompletableFuture介紹
在Java 8中, 新增加了一個包含50個方法左右的類: CompletableFuture,提供了非常強大的Future的擴展功能,可以幫助我們簡化異步編程的復(fù)雜性,提供了函數(shù)式編程的能力,可以通過回調(diào)的方式處理計算結(jié)果,并且提供了轉(zhuǎn)換和組合CompletableFuture的方法。
CompletableFuture類實現(xiàn)了Future接口,所以你還是可以像以前一樣通過get方法阻塞或者輪詢的方式獲得結(jié)果,但是這種方式不推薦使用。 CompletableFuture和FutureTask同屬于Future接口的實現(xiàn)類,都可以獲取線程的執(zhí)行結(jié)果。
三 :具體場景
1.0 單個任務(wù)
1.0.1 runAsync:無返回值
/** * runAsync無返回值 */ CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { System.out.println("當(dāng)前線程" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("運行結(jié)果:" + i); }, executor);
1.0.2 supplyAsync:有返回值
whenComplete:能感知異常,能感知結(jié)果,但沒辦法給返回值
exceptionally:能感知異常,不能感知結(jié)果,能給返回值。相當(dāng)于,如果出現(xiàn)異常就返回這個值
/** * supplyAsync有返回值 * whenComplete能感知異常,能感知結(jié)果,但沒辦法給返回值 * exceptionally能感知異常,不能感知結(jié)果,能給返回值。相當(dāng)于,如果出現(xiàn)異常就返回這個值 */ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("當(dāng)前線程" + Thread.currentThread().getId()); int i = 10 / 0; System.out.println("運行結(jié)果:" + i); return i; }, executor).whenComplete((res,excption)->{ //whenComplete雖然能得到異常信息,但是沒辦法修改返回值 System.out.println("異步任務(wù)成功完成...結(jié)果是:"+res+";異常是:"+excption); }).exceptionally(throwable -> { //exceptionally能感知異常,而且能返回一個默認值,相當(dāng)于,如果出現(xiàn)異常就返回這個值 return 10; });
1.0.3 supplyAsync:有返回值
handle能拿到返回結(jié)果,也能得到異常信息,也能修改返回值
/** * supplyAsync有返回值 * handle能拿到返回結(jié)果,也能得到異常信息,也能修改返回值 */ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("當(dāng)前線程" + Thread.currentThread().getId()); int i = 10 / 4; System.out.println("運行結(jié)果:" + i); return i; }, executor).handle((res,excption)->{ if(excption!=null){ return 0; }else { return res * 2; } });
2.0 兩個任務(wù)編排
兩任務(wù)組合(線程串行化)
可以是兩任務(wù)的串行化,就是一個任務(wù)執(zhí)行完了再執(zhí)行下一個
也可以是多個任務(wù)的串行化,就是按照順序一個個的執(zhí)行
2.0.1 thenRunAsync
不能接收上一次的執(zhí)行結(jié)果,也沒返回值
/** * thenRunXXX 不能接收上一次的執(zhí)行結(jié)果,也沒返回值 * .thenRunAsync(() -> { * System.out.println("任務(wù)2啟動了..."); * }, executor); */ CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("當(dāng)前線程" + Thread.currentThread().getId()); int i = 10 / 4; System.out.println("運行結(jié)果:" + i); return i; }, executor).thenRunAsync(() -> { System.out.println("任務(wù)2啟動了..."); }, executor);
2.0.2 thenAcceptAsync
能接收上一次的執(zhí)行結(jié)果,但沒返回值
/** * thenAcceptXXX 能接收上一次的執(zhí)行結(jié)果,但沒返回值 * .thenAcceptAsync(res->{ * System.out.println("任務(wù)2啟動了..."+res); * },executor); */ CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("當(dāng)前線程" + Thread.currentThread().getId()); int i = 10 / 4; System.out.println("運行結(jié)果:" + i); return i; }, executor).thenAcceptAsync(res -> { System.out.println("任務(wù)2啟動了..." + res); }, executor);
2.0.3 thenApplyAsync
能接收上一次的執(zhí)行結(jié)果,又可以有返回值
/** * thenApplyXXX 能接收上一次的執(zhí)行結(jié)果,又可以有返回值 * .thenApplyAsync(res -> { * System.out.println("任務(wù)2啟動了..." + res); * return "hello " + res; * }, executor); */ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("當(dāng)前線程" + Thread.currentThread().getId()); int i = 10 / 4; System.out.println("運行結(jié)果:" + i); return i; }, executor).thenApplyAsync(res -> { System.out.println("任務(wù)2啟動了..." + res); return "hello " + res; }, executor);
3.0 三任務(wù)編排
先準備兩個任務(wù)
CompletableFuture<Object> future01 =CompletableFuture.supplyAsync(() -> { System.out.println("任務(wù)1線程" + Thread.currentThread().getId()); int i = 10 / 4; System.out.println("任務(wù)1結(jié)束:"); return i; }, executor); CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> { System.out.println("任務(wù)2線程" + Thread.currentThread().getId()); try { Thread.sleep(3000); System.out.println("任務(wù)2結(jié)束:"); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }, executor);
3.0.1 三任務(wù)組合
前兩個任務(wù)都完成,才執(zhí)行任務(wù)3
3.0.1-1、runAfterBothAsync:任務(wù)01 任務(wù)02都完成了,再開始執(zhí)行任務(wù)3,不感知任務(wù)1、2的結(jié)果的,也沒返回值
CompletableFuture<Void> future = future01.runAfterBothAsync(future02, () -> { System.out.println("任務(wù)3開始"); }, executor);
3.0.1-2、thenAcceptBothAsync:任務(wù)01 任務(wù)02都完成了,再開始執(zhí)行任務(wù)3,能感知到任務(wù)1、2的結(jié)果,但沒返回值
CompletableFuture<Void> future = future01.thenAcceptBothAsync(future02, (f1, f2) -> { System.out.println("任務(wù)3開始...得到之前的結(jié)果:f1:" + f1 + ", f2:" + f2); }, executor);
3.0.1-3、 thenCombineAsync:任務(wù)01 任務(wù)02都完成了,再開始執(zhí)行任務(wù)3,能感知到任務(wù)1、2的結(jié)果,而且自己可以帶返回值
CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> { return f1+":"+f2+":哈哈"; }, executor);
3.0.2 三任務(wù)組合二
前兩個任務(wù)只要有一個完成,就執(zhí)行任務(wù)3
3.0.2-1、runAfterEitherAsync:兩個任務(wù)只要有一個完成,就執(zhí)行任務(wù)3,不感知結(jié)果,自己沒返回值
CompletableFuture<Void> future = future01.runAfterEitherAsync(future02, () -> { System.out.println("任務(wù)3開始..."); }, executor);
3.0.2-2、 acceptEitherAsync:兩個任務(wù)只要有一個完成,就執(zhí)行任務(wù)3,感知結(jié)果,自己沒返回值
CompletableFuture<Void> future = future01.acceptEitherAsync(future02, (res) -> { System.out.println("任務(wù)3開始...之前的結(jié)果" + res); }, executor);
3.0.2-3、applyToEitherAsync:兩個任務(wù)只要有一個完成,就執(zhí)行任務(wù)3,感知結(jié)果,自己有返回值
CompletableFuture<String> future = future01.applyToEitherAsync(future02, (res) -> { System.out.println("任務(wù)3開始...之前的結(jié)果" + res); return "任務(wù)3的結(jié)果..."; }, executor);
4.0 多任務(wù)的編排
/** * 多任務(wù)組合 */ CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> { System.out.println("查詢商品圖片信息"); return "hello.jpg"; },executor); CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> { System.out.println("查詢商品屬性信息"); return "黑色+256G"; },executor); CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); System.out.println("查詢商品介紹信息"); } catch (InterruptedException e) { e.printStackTrace(); } return "華為..."; },executor);
4.0.1、allOf:所有任務(wù)都執(zhí)行完
/** * allOf 所有任務(wù)都執(zhí)行完 */ CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImg, futureAttr, futureDesc); allOf.get();//等待所有結(jié)果完成
4.0.2、anyOf:其中有一個任務(wù)執(zhí)行完就可以
/** * anyOf 其中有一個任務(wù)執(zhí)行完就可以 */ CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc); anyOf.get();
四: 一個實際的例子
public SkuItemVo item(Long skuId) { SkuItemVo skuItemVo = new SkuItemVo(); //1、sku詳細信息 sku_info SkuInfoEntity skuInfo = getById(skuId); skuItemVo.setInfo(skuInfo); //2、sku 圖片信息 sku_img List<SkuImagesEntity> images = skuImagesService.getImagesBySkuId(skuId); skuItemVo.setImages(images); //3、spu 銷售屬性組合 List<SkuItemSaleAttrVo> saleAttr = skuSaleAttrValueService.getSaleAttrBySpuId(skuInfo.getSpuId()); skuItemVo.setSaleAttr(saleAttr); //4、spu 的介紹 SpuInfoDescEntity spuInfoDesc = spuInfoDescService.getById(skuInfo.getSpuId()); skuItemVo.setDesc(spuInfoDesc); //5、spu 規(guī)格參數(shù)信息 List<SpuItemAttrGroupVo> groupAttrs = attrGroupService.getAttrGroupWithAttrsBySpuId(skuInfo.getSpuId(),skuInfo.getCatalogId()); skuItemVo.setGroupAttrs(groupAttrs); return skuItemVo; }
使用CompletableFuture異步編排后
private SkuItemVo item(Long skuId) { SkuItemVo skuItemVo = new SkuItemVo(); /** * 3、4、5需要依賴1的運行結(jié)果,需要返回skuInfo后從中獲取spuId和catalogId * 而2不需要依賴1的運行結(jié)果 */ //1、sku詳細信息 sku_info CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> { SkuInfoEntity skuInfo = getById(skuId); skuItemVo.setInfo(skuInfo); return skuInfo; }, executor); //2、sku 圖片信息 sku_img 2不需要等待上邊1的執(zhí)行結(jié)果 CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> { List<SkuImagesEntity> images = skuImagesService.getImagesBySkuId(skuId); skuItemVo.setImages(images); }, executor); //下邊的3、4、5都需要上邊1的執(zhí)行結(jié)果 //所以下邊的3、4、5都是基于上邊1的執(zhí)行結(jié)果 infoFuture 開始的 //都是以infoFuture.thenAcceptAsync(skuInfo -> {})開始的 CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync(skuInfo -> { //3、spu 銷售屬性組合 3 List<SkuItemSaleAttrVo> saleAttr = skuSaleAttrValueService.getSaleAttrBySpuId(skuInfo.getSpuId()); skuItemVo.setSaleAttr(saleAttr); System.out.println(saleAttr); }, executor); CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync(skuInfo -> { //4、spu 的介紹 SpuInfoDescEntity spuInfoDesc = spuInfoDescService.getById(skuInfo.getSpuId()); skuItemVo.setDesc(spuInfoDesc); }, executor); CompletableFuture<Void> attrGroupFuture = infoFuture.thenAcceptAsync(skuInfo -> { //5、spu 規(guī)格參數(shù)信息 List<SpuItemAttrGroupVo> groupAttrs = attrGroupService.getAttrGroupWithAttrsBySpuId(skuInfo.getSpuId(),skuInfo.getCatalogId()); System.out.println(groupAttrs); skuItemVo.setGroupAttrs(groupAttrs); }, executor); //等待所有任務(wù)完成 try { CompletableFuture.allOf(saleAttrFuture,descFuture,attrGroupFuture,imageFuture).get() ; } catch (InterruptedException e) { log.error("查詢商品詳情異步編排錯誤: "); log.error(e.getMessage() ); } catch (ExecutionException e) { log.error(e.getMessage() ); } return skuItemVo; }
以上就是Java CompletableFuture實現(xiàn)多線程異步編排的詳細內(nèi)容,更多關(guān)于Java CompletableFuture的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
一篇文章總結(jié)Java虛擬機內(nèi)存區(qū)域模型
這篇文章主要介紹了一篇文章總結(jié)Java虛擬機內(nèi)存區(qū)域模型,本篇文章主要來總結(jié)一下Java虛擬機內(nèi)存的各個區(qū)域,以及這些區(qū)域的作用、服務(wù)對象以及其中可能產(chǎn)生的問題,作為大家的面試寶典。,需要的朋友可以參考下2019-06-06Spring?boot?easyexcel?實現(xiàn)復(fù)合數(shù)據(jù)導(dǎo)出、按模塊導(dǎo)出功能
這篇文章主要介紹了Spring?boot?easyexcel?實現(xiàn)復(fù)合數(shù)據(jù)導(dǎo)出、按模塊導(dǎo)出,實現(xiàn)思路流程是準備一個導(dǎo)出基礎(chǔ)填充模板,默認填充key,本文給大家介紹的非常詳細,需要的朋友可以參考下2023-09-09