學(xué)會CompletableFuture輕松駕馭異步編程
前言
本文隸屬于我歸納整理的Android知識體系的第四部分,屬于 異步
部分的多線程內(nèi)容
您可以通過訪問 總綱 閱讀系列內(nèi)的其他文章。
作者按:草稿進(jìn)行了幾次大改,移除了Demo部分、源碼解析部分、設(shè)計原理部分。結(jié)合實際工作經(jīng)驗,"掌握API能熟練使用、能無障礙閱讀相關(guān)框架源碼" 已基本夠用。
讀者可結(jié)合下面的導(dǎo)圖進(jìn)行快速的知識自查
一個美好的期望
通常情況下,我們希望代碼的執(zhí)行順序和代碼的組織順序一致,即代碼表述了同步執(zhí)行的程序,這樣可以減少很多思考。
而 閱讀異步的程序代碼,需要在腦海中建立事件流,當(dāng)程序業(yè)務(wù)復(fù)雜時,將挑戰(zhàn)人的記憶力和空間想象力,并非所有人都擅長在腦海中構(gòu)建并分析異步事件流模型。
所以,我們期望擁有一個非常友好的框架,能夠讓我們方便地進(jìn)行異步編程,并且在框架內(nèi)部設(shè)計有線程同步、異常處理機(jī)制。
并且,基于該框架編寫的代碼具有很高的可讀、可理解性。
而Future基本無法滿足這一期望。
Future的不足與CompletableFuture的來源
Future的不足
在先前的系列文章中,我們已經(jīng)回顧了Future類的設(shè)計,在絕大多數(shù)場景下,我們選擇使用多線程,是為了 充分利用機(jī)器性能 以及 避免用戶交互線程出現(xiàn)長時間阻塞 以致影響體驗。
所以我們將耗時的、會引起長時間阻塞的任務(wù)分離到其他線程執(zhí)行,并在 合適時機(jī) 進(jìn)行線程同步,于主線程(一般負(fù)責(zé)用戶交互處理、界面渲染)中處理結(jié)果。
詳見拙作 掌握Future,輕松獲取異步任務(wù)結(jié)果
Future
于 Java 1.5版本引入,它類似于 異步處理的結(jié)果占位符 , 提供了兩個方法獲取結(jié)果:
get()
, 調(diào)用線程進(jìn)入阻塞直至得到結(jié)果或者異常。get(long timeout, TimeUnit unit)
, 調(diào)用線程將僅在指定時間 timeout 內(nèi)等待結(jié)果或者異常,如果超時未獲得結(jié)果就會拋出 TimeoutException 異常。
Future
可以實現(xiàn) Runnable
或 Callable
接口來定義任務(wù),一定程度上滿足 使用框架進(jìn)行異步編程 的期望,但通過整體源碼可知它存在如下 3個問題 :
- 調(diào)用
get()
方法會一直阻塞直到獲取結(jié)果、異常,無法在任務(wù)完成時獲得 "通知" ,無法附加回調(diào)函數(shù) - 不具備鏈?zhǔn)秸{(diào)用和結(jié)果聚合處理能力,當(dāng)我們想鏈接多個
Future
共同完成一件任務(wù)時,沒有框架級的處理,只能編寫業(yè)務(wù)級邏輯,合并結(jié)果,并小心的處理同步 - 需要單獨編寫異常處理代碼
使用 get(long timeout, TimeUnit unit)
和 isDone()
判斷,確實可以緩解問題1,但這需要結(jié)合業(yè)務(wù)單獨設(shè)計(調(diào)優(yōu)),存在大量的不確定性。不再展開
Java 8中引入 CompletableFuture
來解決 Future
的不足。
CompletableFuture來源
CompletableFuture
的設(shè)計靈感來自于 Google Guava
庫的 ListenableFuture
類,它實現(xiàn)了 Future接口
和 CompletionStage接口
, 并且新增一系列API,支持Java 8的 lambda特性
,通過回調(diào)利用非阻塞方法,提升了異步編程模型。
它解決了Future的不足,允許我們在非主線程中運行任務(wù),并向啟動線程 (一般是主線程) 通知 任務(wù)完成
或 任務(wù)失敗
,編寫異步的、非阻塞的程序。
使用CompletableFuture
最簡方式獲取實例
使用 CompletableFuture.completedFuture(U value)
可以獲取一個 執(zhí)行狀態(tài)已經(jīng)完成
的 CompletableFuture
對象。
這可以用于快速改造舊程序,并進(jìn)行逐步過渡
class Demo { @Test public void testSimpleCompletableFuture() { CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("testSimpleCompletableFuture"); assertTrue(completableFuture.isDone()); try { assertEquals("testSimpleCompletableFuture", completableFuture.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
改造線程同步部分
部分老舊程序已經(jīng)建立了多線程業(yè)務(wù)模型,我們可以使用 CompletableFuture
改造其中的線程同步部分,但暫不改造數(shù)據(jù)傳遞。
使用 runAsync()
方法,該方法接收一個 Runnable
類型的參數(shù)返回 CompletableFuture<Void>
:
//并不改變原項目中數(shù)據(jù)傳遞的部分、或者不關(guān)心結(jié)果數(shù)據(jù),僅進(jìn)行同步 class Demo { @Test public void testCompletableFutureRunAsync() { AtomicInteger variable = new AtomicInteger(0); CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> process(variable)); runAsync.join(); assertEquals(1, variable.get()); } public void process(AtomicInteger variable) { System.out.println(Thread.currentThread() + " Process..."); variable.set(1); } }
進(jìn)一步改造結(jié)果數(shù)據(jù)傳遞
當(dāng)我們關(guān)心異步任務(wù)的結(jié)果數(shù)據(jù)、或者改造原 多線程業(yè)務(wù)模型 的 數(shù)據(jù)傳遞方式 時,可以使用 supplyAsync()
方法,該方法接收一個 Supplier<T>
接口類型的參數(shù),它實現(xiàn)了任務(wù)的邏輯,方法返回 CompletableFuture<T>
實例。
class Demo { @Test public void testCompletableFutureSupplyAsync() { CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process); try { // Blocking assertEquals("testCompletableFutureSupplyAsync", supplyAsync.get()); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } } public String process() { return "testCompletableFutureSupplyAsync"; } }
指定執(zhí)行線程池
"獲取用于執(zhí)行任務(wù)的線程" 類似 Java 8 中的 parallelStream
, CompletableFuture
默認(rèn)從全局 ForkJoinPool.commonPool()
獲取線程,用于執(zhí)行任務(wù)。同時也提供了指定線程池的方式用于獲取線程執(zhí)行任務(wù),您可以使用API中具有 Executor
參數(shù)的重載方法。
class Demo { @Test public void testCompletableFutureSupplyAsyncWithExecutor() { ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2); CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process, newFixedThreadPool); try { // Blocking assertEquals("testCompletableFutureSupplyAsyncWithExecutor", supplyAsync.get()); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } } public String process() { return "testCompletableFutureSupplyAsyncWithExecutor"; } }
CompletableFuture
中有眾多API,方法命名中含有 Async
的API可使用線程池。
截至此處,以上使用方式均與 Future
類似,接下來演示 CompletableFuture
的不同
回調(diào)&鏈?zhǔn)秸{(diào)用
CompletableFuture
的 get()
API是阻塞式獲取結(jié)果,CompletableFuture
提供了
thenApply
thenAccept
thenRun
等API來避免阻塞式獲取,并且可添加 任務(wù)完成
后的回調(diào)。這幾個方法的使用場景如下:
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
收到結(jié)果后,可以進(jìn)行轉(zhuǎn)化CompletableFuture<Void> thenAccept(Consumer<? super T> action)
收到結(jié)果后,對其進(jìn)行消費CompletableFuture<Void> thenRun(Runnable action)
收到結(jié)果后,執(zhí)行回調(diào),無法消費結(jié)果只能消費 這一事件
API較為簡單,不再代碼演示
顯然,通過鏈?zhǔn)秸{(diào)用可以組裝多個執(zhí)行過程。
有讀者可能會疑惑:Function
和 Consumer
也可以進(jìn)行鏈?zhǔn)浇M裝,是否存在冗余呢?
兩種的鏈?zhǔn)秸{(diào)用特性確實存在重疊,您可以自行選擇用法,但 thenRun
只能采用 CompletableFuture
的鏈?zhǔn)秸{(diào)用。
另外,前面提到,我們可以指定線程池執(zhí)行任務(wù),對于這三組API,同樣有相同的特性,通過 thenXXXXAsync
指定線程池,這是 Function
和 Consumer
的鏈?zhǔn)浇M裝所無法完成的。
class Demo { @Test public void testCompletableFutureApplyAsync() { ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2); ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(); // 從線程池 newFixedThreadPool 獲取線程執(zhí)行任務(wù) CompletableFuture<Double> completableFuture = CompletableFuture.supplyAsync(() -> 1D, newFixedThreadPool) .thenApplyAsync(d -> d + 1D, newSingleThreadScheduledExecutor) .thenApplyAsync(d -> d + 2D); Double result = completableFuture.join(); assertEquals(4D, result); } }
聚合多個CompletableFuture
通過 聚合
多個 CompletableFuture
,可以組成更 復(fù)雜
的業(yè)務(wù)流,可以達(dá)到精細(xì)地控制粒度、聚焦單個節(jié)點的業(yè)務(wù)。
注意:操作符并不能完全的控制 CompletableFuture
任務(wù)執(zhí)行的時機(jī),您需要謹(jǐn)慎的選擇 CompletableFuture
的創(chuàng)建時機(jī)
thenCompose、thenComposeAsync
compose
原意為 組成
, 通過多個 CompletableFuture
構(gòu)建異步流。
在操作的 CompletableFuture
獲得結(jié)果時,將另一個 CompletableFuture
compose
到異步流中,compose的過程中,可以根據(jù)操作的 CompletableFuture
的結(jié)果編寫邏輯。
與 thenApply
相比,thenCompose
返回邏輯中提供的 CompletableFuture
而 thenApply
返回框架內(nèi)處理的新實例。
注意,這一特性在使用 FP編程范式
進(jìn)行編碼時,會顯得非常靈活,一定程度上提升了函數(shù)的復(fù)用性
API含義直觀,不再進(jìn)行代碼演示
thenCombine、thenCombineAsync
thenCombine
可以用于合并多個 獨立任務(wù) 的處理結(jié)果。
注意: thenCompose
進(jìn)行聚合時,下游可以使用上游的結(jié)果,在業(yè)務(wù)需求上一般表現(xiàn)為依賴上一步結(jié)果,而非兩者相互獨立。
例如,產(chǎn)品希望在博客詳情頁同時展示 "博客的詳情" 和 "作者主要信息" ,以避免內(nèi)容區(qū)抖動或割裂的骨架占位。這兩者 可以獨立獲取時 ,則可以使用 thenCombine
系列API,分別獲取,并合并結(jié)果。
combine
的特點是 被合并的兩個 CompletableFuture
可以并發(fā),等兩者都獲得結(jié)果后進(jìn)行合并。
但它依舊存在使用上的不便捷,合并超過2個 CompletableFuture
時,顯得不夠靈活??梢允褂?static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
API。
allOf
創(chuàng)建了 CompletableFuture<Void>
,并不會幫助我們合并結(jié)果,所以需要自行編寫業(yè)務(wù)代碼合并,故存在 Side Effects
。
runAfterBoth、runAfterBothAsync;runAfterEither、runAfterEitherAsync
runAfterBoth
系列API在兩個CompletableFuture
都獲得結(jié)果后執(zhí)行回調(diào)runAfterEither
系列API在兩個CompletableFuture
任意一個獲得結(jié)果后執(zhí)行回調(diào)
通過API,不難理解它們需要使用者自行處理結(jié)果
CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
;CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
同樣可以增加編碼靈活性,不再贅述。
applyToEither、applyToEitherAsync;
acceptEither、acceptEitherAsync;thenAcceptBoth、thenAcceptBothAsync
applyToEither
系列API表現(xiàn)如thenApply
和Either
的組合,兩個同類型的CompletableFuture
任意一個獲得結(jié)果后,可消費該結(jié)果并進(jìn)行改變,類似 thenApplyacceptEither
系列API表現(xiàn)如thenAccept
和Either
的組合,兩個同類型的CompletableFuture
任意一個獲得結(jié)果后,可消費該結(jié)果,類似 thenAcceptthenAcceptBoth
系列API表現(xiàn)如thenCombine
,但返回CompletableFuture<Void>
同樣可以增加編碼靈活性,不再贅述
結(jié)果處理
使用回調(diào)處理結(jié)果有兩種API,注意,除了正常獲得結(jié)果外還可能獲得異常,而這兩組API簇差異體現(xiàn)在對 異常
的處理中。
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
handle
使用 BiFunction
,無論是正常結(jié)果還是異常情況,均視作可被邏輯接受,消費后轉(zhuǎn)化
而 whenComplete
使用 BiConsumer
,僅可消費但不能轉(zhuǎn)化,異常情況被視作不可被邏輯接受,仍會拋出。
舉個例子,進(jìn)行網(wǎng)絡(luò)編程時會遇到 Exception
, 如果業(yè)務(wù)設(shè)計中使用的模型實體包含了 正常結(jié)果
、異常
兩種情況:
open class Result<T>(val t: T?) { open val isThr: Boolean = false } class FailResult<T>(val tr: Throwable) : Result<T>(null) { override val isThr: Boolean = true }
則適合使用 handle
API在底層處理。否則需要額外的異常處理,可依據(jù)項目的設(shè)計選擇處理方式,一般在依據(jù)FP范式設(shè)計的程序中,傾向于使用handle,避免增加side effect。
異常處理
在多線程背景下,異常處理并不容易。它不僅僅是使用 try-catch
捕獲異常,還包含程序異步流中,節(jié)點出現(xiàn)異常時流的業(yè)務(wù)走向。
在 CompletableFuture
中,節(jié)點出現(xiàn)異常將跳過后續(xù)節(jié)點,進(jìn)入異常處理。
_如果您不希望某個節(jié)點拋出異常導(dǎo)致后續(xù)流程中斷,則可在節(jié)點的處理中捕獲并包裝為結(jié)果、或者對子 CompletableFuture 節(jié)點采用 handle
、exceptionally
API轉(zhuǎn)換異常 _
除前文提到的 handle
whenComplete
,CompletableFuture
中還提供了 exceptionally
API用于處理異常
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
從表現(xiàn)結(jié)果看,它類似于 handle
API中對異常的處理,將異常轉(zhuǎn)換為目標(biāo)結(jié)果的一種特定情形。
以上就是學(xué)會CompletableFuture輕松駕馭異步編程的詳細(xì)內(nèi)容,更多關(guān)于CompletableFuture異步編程的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring?Cloud?Gateway遠(yuǎn)程命令執(zhí)行漏洞分析(CVE-2022-22947)
使用Spring Cloud Gateway的應(yīng)用程序在Actuator端點啟用、公開和不安全的情況下容易受到代碼注入的攻擊,攻擊者可以惡意創(chuàng)建允許在遠(yuǎn)程主機(jī)上執(zhí)行任意遠(yuǎn)程執(zhí)行的請求,這篇文章主要介紹了Spring?Cloud?Gateway遠(yuǎn)程命令執(zhí)行漏洞(CVE-2022-22947),需要的朋友可以參考下2023-03-03解決FontConfiguration.getVersion報空指針異常的問題
這篇文章主要介紹了解決FontConfiguration.getVersion報空指針異常的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-06-06Java學(xué)習(xí)之Lambda表達(dá)式的使用詳解
Lambda表達(dá)式是Java SE 8中一個重要的新特性,允許通過表達(dá)式來代替功能接口。本文將通過一些簡單的示例和大家講講Lamda表達(dá)式的使用,感興趣的可以了解一下2022-12-12基于springboot和redis實現(xiàn)單點登錄
這篇文章主要為大家詳細(xì)介紹了基于springboot和redis實現(xiàn)單點登錄,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-06-06SpringBoot集成Swagger2生成接口文檔的方法示例
我們提供Restful接口的時候,API文檔是尤為的重要,它承載著對接口的定義,描述等,本文主要介紹了SpringBoot集成Swagger2生成接口文檔的方法示例,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-12-12spring profile 多環(huán)境配置管理詳解
這篇文章主要介紹了 spring profile 多環(huán)境配置管理詳解的相關(guān)資料,需要的朋友可以參考下2017-01-01