學會CompletableFuture輕松駕馭異步編程
前言
本文隸屬于我歸納整理的Android知識體系的第四部分,屬于 異步 部分的多線程內(nèi)容
您可以通過訪問 總綱 閱讀系列內(nèi)的其他文章。

作者按:草稿進行了幾次大改,移除了Demo部分、源碼解析部分、設(shè)計原理部分。結(jié)合實際工作經(jīng)驗,"掌握API能熟練使用、能無障礙閱讀相關(guān)框架源碼" 已基本夠用。
讀者可結(jié)合下面的導(dǎo)圖進行快速的知識自查

一個美好的期望
通常情況下,我們希望代碼的執(zhí)行順序和代碼的組織順序一致,即代碼表述了同步執(zhí)行的程序,這樣可以減少很多思考。
而 閱讀異步的程序代碼,需要在腦海中建立事件流,當程序業(yè)務(wù)復(fù)雜時,將挑戰(zhàn)人的記憶力和空間想象力,并非所有人都擅長在腦海中構(gòu)建并分析異步事件流模型。
所以,我們期望擁有一個非常友好的框架,能夠讓我們方便地進行異步編程,并且在框架內(nèi)部設(shè)計有線程同步、異常處理機制。
并且,基于該框架編寫的代碼具有很高的可讀、可理解性。
而Future基本無法滿足這一期望。
Future的不足與CompletableFuture的來源
Future的不足
在先前的系列文章中,我們已經(jīng)回顧了Future類的設(shè)計,在絕大多數(shù)場景下,我們選擇使用多線程,是為了 充分利用機器性能 以及 避免用戶交互線程出現(xiàn)長時間阻塞 以致影響體驗。
所以我們將耗時的、會引起長時間阻塞的任務(wù)分離到其他線程執(zhí)行,并在 合適時機 進行線程同步,于主線程(一般負責用戶交互處理、界面渲染)中處理結(jié)果。
詳見拙作 掌握Future,輕松獲取異步任務(wù)結(jié)果
Future 于 Java 1.5版本引入,它類似于 異步處理的結(jié)果占位符 , 提供了兩個方法獲取結(jié)果:
get(), 調(diào)用線程進入阻塞直至得到結(jié)果或者異常。get(long timeout, TimeUnit unit), 調(diào)用線程將僅在指定時間 timeout 內(nèi)等待結(jié)果或者異常,如果超時未獲得結(jié)果就會拋出 TimeoutException 異常。
Future 可以實現(xiàn) Runnable 或 Callable 接口來定義任務(wù),一定程度上滿足 使用框架進行異步編程 的期望,但通過整體源碼可知它存在如下 3個問題 :
- 調(diào)用
get()方法會一直阻塞直到獲取結(jié)果、異常,無法在任務(wù)完成時獲得 "通知" ,無法附加回調(diào)函數(shù) - 不具備鏈式調(diào)用和結(jié)果聚合處理能力,當我們想鏈接多個
Future共同完成一件任務(wù)時,沒有框架級的處理,只能編寫業(yè)務(wù)級邏輯,合并結(jié)果,并小心的處理同步 - 需要單獨編寫異常處理代碼
使用 get(long timeout, TimeUnit unit) 和 isDone() 判斷,確實可以緩解問題1,但這需要結(jié)合業(yè)務(wù)單獨設(shè)計(調(diào)優(yōu)),存在大量的不確定性。不再展開
Java 8中引入 CompletableFuture 來解決 Future 的不足。
CompletableFuture來源
CompletableFuture 的設(shè)計靈感來自于 Google Guava 庫的 ListenableFuture 類,它實現(xiàn)了 Future接口 和 CompletionStage接口 , 并且新增一系列API,支持Java 8的 lambda特性,通過回調(diào)利用非阻塞方法,提升了異步編程模型。

它解決了Future的不足,允許我們在非主線程中運行任務(wù),并向啟動線程 (一般是主線程) 通知 任務(wù)完成 或 任務(wù)失敗,編寫異步的、非阻塞的程序。
使用CompletableFuture
最簡方式獲取實例
使用 CompletableFuture.completedFuture(U value) 可以獲取一個 執(zhí)行狀態(tài)已經(jīng)完成 的 CompletableFuture 對象。
這可以用于快速改造舊程序,并進行逐步過渡
class Demo {
@Test
public void testSimpleCompletableFuture() {
CompletableFuture<String> completableFuture =
CompletableFuture.completedFuture("testSimpleCompletableFuture");
assertTrue(completableFuture.isDone());
try {
assertEquals("testSimpleCompletableFuture", completableFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
改造線程同步部分
部分老舊程序已經(jīng)建立了多線程業(yè)務(wù)模型,我們可以使用 CompletableFuture 改造其中的線程同步部分,但暫不改造數(shù)據(jù)傳遞。
使用 runAsync() 方法,該方法接收一個 Runnable 類型的參數(shù)返回 CompletableFuture<Void>:
//并不改變原項目中數(shù)據(jù)傳遞的部分、或者不關(guān)心結(jié)果數(shù)據(jù),僅進行同步
class Demo {
@Test
public void testCompletableFutureRunAsync() {
AtomicInteger variable = new AtomicInteger(0);
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> process(variable));
runAsync.join();
assertEquals(1, variable.get());
}
public void process(AtomicInteger variable) {
System.out.println(Thread.currentThread() + " Process...");
variable.set(1);
}
}
進一步改造結(jié)果數(shù)據(jù)傳遞
當我們關(guān)心異步任務(wù)的結(jié)果數(shù)據(jù)、或者改造原 多線程業(yè)務(wù)模型 的 數(shù)據(jù)傳遞方式 時,可以使用 supplyAsync() 方法,該方法接收一個 Supplier<T> 接口類型的參數(shù),它實現(xiàn)了任務(wù)的邏輯,方法返回 CompletableFuture<T> 實例。
class Demo {
@Test
public void testCompletableFutureSupplyAsync() {
CompletableFuture<String> supplyAsync =
CompletableFuture.supplyAsync(this::process);
try {
// Blocking
assertEquals("testCompletableFutureSupplyAsync", supplyAsync.get());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
public String process() {
return "testCompletableFutureSupplyAsync";
}
}
指定執(zhí)行線程池
"獲取用于執(zhí)行任務(wù)的線程" 類似 Java 8 中的 parallelStream, CompletableFuture 默認從全局 ForkJoinPool.commonPool() 獲取線程,用于執(zhí)行任務(wù)。同時也提供了指定線程池的方式用于獲取線程執(zhí)行任務(wù),您可以使用API中具有 Executor 參數(shù)的重載方法。
class Demo {
@Test
public void testCompletableFutureSupplyAsyncWithExecutor() {
ExecutorService newFixedThreadPool =
Executors.newFixedThreadPool(2);
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process,
newFixedThreadPool);
try {
// Blocking
assertEquals("testCompletableFutureSupplyAsyncWithExecutor", supplyAsync.get());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
public String process() {
return "testCompletableFutureSupplyAsyncWithExecutor";
}
}
CompletableFuture 中有眾多API,方法命名中含有 Async 的API可使用線程池。
截至此處,以上使用方式均與 Future 類似,接下來演示 CompletableFuture 的不同
回調(diào)&鏈式調(diào)用
CompletableFuture 的 get()API是阻塞式獲取結(jié)果,CompletableFuture 提供了
thenApplythenAcceptthenRun
等API來避免阻塞式獲取,并且可添加 任務(wù)完成 后的回調(diào)。這幾個方法的使用場景如下:
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)收到結(jié)果后,可以進行轉(zhuǎn)化CompletableFuture<Void> thenAccept(Consumer<? super T> action)收到結(jié)果后,對其進行消費CompletableFuture<Void> thenRun(Runnable action)收到結(jié)果后,執(zhí)行回調(diào),無法消費結(jié)果只能消費 這一事件
API較為簡單,不再代碼演示
顯然,通過鏈式調(diào)用可以組裝多個執(zhí)行過程。
有讀者可能會疑惑:Function 和 Consumer 也可以進行鏈式組裝,是否存在冗余呢?
兩種的鏈式調(diào)用特性確實存在重疊,您可以自行選擇用法,但 thenRun 只能采用 CompletableFuture的鏈式調(diào)用。
另外,前面提到,我們可以指定線程池執(zhí)行任務(wù),對于這三組API,同樣有相同的特性,通過 thenXXXXAsync 指定線程池,這是 Function 和 Consumer 的鏈式組裝所無法完成的。
class Demo {
@Test
public void testCompletableFutureApplyAsync() {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
// 從線程池 newFixedThreadPool 獲取線程執(zhí)行任務(wù)
CompletableFuture<Double> completableFuture =
CompletableFuture.supplyAsync(() -> 1D, newFixedThreadPool)
.thenApplyAsync(d -> d + 1D, newSingleThreadScheduledExecutor)
.thenApplyAsync(d -> d + 2D);
Double result = completableFuture.join();
assertEquals(4D, result);
}
}
聚合多個CompletableFuture
通過 聚合 多個 CompletableFuture,可以組成更 復(fù)雜 的業(yè)務(wù)流,可以達到精細地控制粒度、聚焦單個節(jié)點的業(yè)務(wù)。
注意:操作符并不能完全的控制 CompletableFuture 任務(wù)執(zhí)行的時機,您需要謹慎的選擇 CompletableFuture 的創(chuàng)建時機
thenCompose、thenComposeAsync
compose 原意為 組成, 通過多個 CompletableFuture 構(gòu)建異步流。
在操作的 CompletableFuture 獲得結(jié)果時,將另一個 CompletableFuture compose 到異步流中,compose的過程中,可以根據(jù)操作的 CompletableFuture 的結(jié)果編寫邏輯。
與 thenApply 相比,thenCompose 返回邏輯中提供的 CompletableFuture 而 thenApply 返回框架內(nèi)處理的新實例。
注意,這一特性在使用 FP編程范式進行編碼時,會顯得非常靈活,一定程度上提升了函數(shù)的復(fù)用性
API含義直觀,不再進行代碼演示
thenCombine、thenCombineAsync
thenCombine 可以用于合并多個 獨立任務(wù) 的處理結(jié)果。
注意: thenCompose 進行聚合時,下游可以使用上游的結(jié)果,在業(yè)務(wù)需求上一般表現(xiàn)為依賴上一步結(jié)果,而非兩者相互獨立。
例如,產(chǎn)品希望在博客詳情頁同時展示 "博客的詳情" 和 "作者主要信息" ,以避免內(nèi)容區(qū)抖動或割裂的骨架占位。這兩者 可以獨立獲取時 ,則可以使用 thenCombine 系列API,分別獲取,并合并結(jié)果。
combine 的特點是 被合并的兩個 CompletableFuture 可以并發(fā),等兩者都獲得結(jié)果后進行合并。
但它依舊存在使用上的不便捷,合并超過2個 CompletableFuture 時,顯得不夠靈活??梢允褂?static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) API。
allOf 創(chuàng)建了 CompletableFuture<Void>,并不會幫助我們合并結(jié)果,所以需要自行編寫業(yè)務(wù)代碼合并,故存在 Side Effects。
runAfterBoth、runAfterBothAsync;runAfterEither、runAfterEitherAsync
runAfterBoth系列API在兩個CompletableFuture都獲得結(jié)果后執(zhí)行回調(diào)runAfterEither系列API在兩個CompletableFuture任意一個獲得結(jié)果后執(zhí)行回調(diào)
通過API,不難理解它們需要使用者自行處理結(jié)果
CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action);CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
同樣可以增加編碼靈活性,不再贅述。
applyToEither、applyToEitherAsync;
acceptEither、acceptEitherAsync;thenAcceptBoth、thenAcceptBothAsync
applyToEither系列API表現(xiàn)如thenApply和Either的組合,兩個同類型的CompletableFuture任意一個獲得結(jié)果后,可消費該結(jié)果并進行改變,類似 thenApplyacceptEither系列API表現(xiàn)如thenAccept和Either的組合,兩個同類型的CompletableFuture任意一個獲得結(jié)果后,可消費該結(jié)果,類似 thenAcceptthenAcceptBoth系列API表現(xiàn)如thenCombine,但返回CompletableFuture<Void>
同樣可以增加編碼靈活性,不再贅述
結(jié)果處理
使用回調(diào)處理結(jié)果有兩種API,注意,除了正常獲得結(jié)果外還可能獲得異常,而這兩組API簇差異體現(xiàn)在對 異常 的處理中。
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
handle 使用 BiFunction,無論是正常結(jié)果還是異常情況,均視作可被邏輯接受,消費后轉(zhuǎn)化
而 whenComplete 使用 BiConsumer,僅可消費但不能轉(zhuǎn)化,異常情況被視作不可被邏輯接受,仍會拋出。
舉個例子,進行網(wǎng)絡(luò)編程時會遇到 Exception, 如果業(yè)務(wù)設(shè)計中使用的模型實體包含了 正常結(jié)果、異常 兩種情況:
open class Result<T>(val t: T?) {
open val isThr: Boolean = false
}
class FailResult<T>(val tr: Throwable) : Result<T>(null) {
override val isThr: Boolean = true
}
則適合使用 handle API在底層處理。否則需要額外的異常處理,可依據(jù)項目的設(shè)計選擇處理方式,一般在依據(jù)FP范式設(shè)計的程序中,傾向于使用handle,避免增加side effect。
異常處理
在多線程背景下,異常處理并不容易。它不僅僅是使用 try-catch 捕獲異常,還包含程序異步流中,節(jié)點出現(xiàn)異常時流的業(yè)務(wù)走向。
在 CompletableFuture 中,節(jié)點出現(xiàn)異常將跳過后續(xù)節(jié)點,進入異常處理。
_如果您不希望某個節(jié)點拋出異常導(dǎo)致后續(xù)流程中斷,則可在節(jié)點的處理中捕獲并包裝為結(jié)果、或者對子 CompletableFuture 節(jié)點采用 handle、exceptionally API轉(zhuǎn)換異常 _
除前文提到的 handle whenComplete,CompletableFuture 中還提供了 exceptionally API用于處理異常
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
從表現(xiàn)結(jié)果看,它類似于 handle API中對異常的處理,將異常轉(zhuǎn)換為目標結(jié)果的一種特定情形。
以上就是學會CompletableFuture輕松駕馭異步編程的詳細內(nèi)容,更多關(guān)于CompletableFuture異步編程的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring?Cloud?Gateway遠程命令執(zhí)行漏洞分析(CVE-2022-22947)
使用Spring Cloud Gateway的應(yīng)用程序在Actuator端點啟用、公開和不安全的情況下容易受到代碼注入的攻擊,攻擊者可以惡意創(chuàng)建允許在遠程主機上執(zhí)行任意遠程執(zhí)行的請求,這篇文章主要介紹了Spring?Cloud?Gateway遠程命令執(zhí)行漏洞(CVE-2022-22947),需要的朋友可以參考下2023-03-03
解決FontConfiguration.getVersion報空指針異常的問題
這篇文章主要介紹了解決FontConfiguration.getVersion報空指針異常的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-06-06
基于springboot和redis實現(xiàn)單點登錄
這篇文章主要為大家詳細介紹了基于springboot和redis實現(xiàn)單點登錄,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-06-06
SpringBoot集成Swagger2生成接口文檔的方法示例
我們提供Restful接口的時候,API文檔是尤為的重要,它承載著對接口的定義,描述等,本文主要介紹了SpringBoot集成Swagger2生成接口文檔的方法示例,需要的朋友們下面隨著小編來一起學習學習吧2018-12-12
spring profile 多環(huán)境配置管理詳解
這篇文章主要介紹了 spring profile 多環(huán)境配置管理詳解的相關(guān)資料,需要的朋友可以參考下2017-01-01

