Java8 CompletableFuture詳解
Java 8來(lái)了,是時(shí)候?qū)W一下新的東西了。Java 7和Java 6只不過(guò)是稍作修改的版本,而Java 8將會(huì)發(fā)生重大的改進(jìn)?;蛟S是Java 8太大了吧?今天我會(huì)給你徹底地解釋JDK 8中的新的抽象 – CompletableFuture。眾所周知,Java 8不到一年就會(huì)發(fā)布,因此這篇文章是基于JDK 8 build 88 with lambda support的。CompletableFuture extends Future提供了方法,一元操作符和促進(jìn)異步性以及事件驅(qū)動(dòng)編程模型,它并不止步于舊版本的Java中。如果你打開(kāi)JavaDoc of CompletableFuture你一定會(huì)感到震驚。大約有五十種方法(?。?,而且它們中的一些非常有意思而且不好理解,例如:
CompletableFuture<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn,
Executor executor)
不必?fù)?dān)心,繼續(xù)讀下去。CompletableFuture收集了所有ListenableFuture in Guava 和 SettableFuture的特征。此外,內(nèi)置的lambda表達(dá)式使它更接近于Scala/Akka futures。這聽(tīng)起來(lái)好得令人難以置信,但是請(qǐng)繼續(xù)讀下去。CompletableFuture有兩個(gè)主要的方面優(yōu)于ol中的Future – 異步回調(diào)/轉(zhuǎn)換,這能使得從任何時(shí)刻的任何線程都可以設(shè)置CompletableFuture的值。
一、提取、修改包裝的值
通常futures代表其它線程中運(yùn)行的代碼,但事實(shí)并非總是如此。有時(shí)你想要?jiǎng)?chuàng)造一個(gè)Future來(lái)表示你知道將會(huì)發(fā)生什么,例如JMS message arrival。所以你有Future但是未來(lái)并沒(méi)有潛在的異步工作。你只是想在未來(lái)JMS消息到達(dá)時(shí)簡(jiǎn)單地完成(解決),這是由一個(gè)事件驅(qū)動(dòng)的。在這種情況下,你可以簡(jiǎn)單地創(chuàng)建CompletableFuture來(lái)返還給你的客戶端,只要你認(rèn)為你的結(jié)果是可用的,僅僅通過(guò)complete()就能解鎖所有等待Future的客戶端。
首先你可以簡(jiǎn)單地創(chuàng)建新的CompletableFuture并且給你的客戶端:
final CompletableFuture<String> future = new CompletableFuture<>();
//...
return future;
}
注意這個(gè)future和Callable沒(méi)有任何聯(lián)系,沒(méi)有線程池也不是異步工作。如果現(xiàn)在客戶端代碼調(diào)用ask().get()它將永遠(yuǎn)阻塞。如果寄存器完成回調(diào),它們就永遠(yuǎn)不會(huì)生效了。所以關(guān)鍵是什么?現(xiàn)在你可以說(shuō):
…此時(shí)此刻所有客戶端Future.get()將得到字符串的結(jié)果,同時(shí)完成回調(diào)以后將會(huì)立即生效。當(dāng)你想代表Future的任務(wù)時(shí)是非常方便的,而且沒(méi)有必要去計(jì)算一些執(zhí)行線程的任務(wù)上。CompletableFuture.complete()只能調(diào)用一次,后續(xù)調(diào)用將被忽略。但也有一個(gè)后門叫做CompletableFuture.obtrudeValue(…)覆蓋一個(gè)新Future之前的價(jià)值,請(qǐng)小心使用。
有時(shí)你想要看到信號(hào)發(fā)生故障的情況,如你所知Future對(duì)象可以處理它所包含的結(jié)果或異常。如果你想進(jìn)一步傳遞一些異常,可以用CompletableFuture.completeExceptionally(ex) (或者用obtrudeException(ex)這樣更強(qiáng)大的方法覆蓋前面的異常)。 completeExceptionally()也能解鎖所有等待的客戶端,但這一次從get()拋出異常。說(shuō)到get(),也有CompletableFuture.join()方法在錯(cuò)誤處理方面有著細(xì)微的變動(dòng)。但總體上,它們都是一樣的。最后也有CompletableFuture.getNow(valueIfAbsent)方法沒(méi)有阻塞但是如果Future還沒(méi)完成將返回默認(rèn)值,這使得當(dāng)構(gòu)建那種我們不想等太久的健壯系統(tǒng)時(shí)非常有用。
最后static的方法是用completedFuture(value)來(lái)返回已經(jīng)完成Future的對(duì)象,當(dāng)測(cè)試或者寫一些適配器層時(shí)可能非常有用。
二、創(chuàng)造和獲取CompletableFuture
好了,那么手動(dòng)地創(chuàng)建CompletableFuture是我們唯一的選擇嗎?不一定。就像一般的Futures,我們可以關(guān)聯(lián)存在的任務(wù),同時(shí)CompletableFuture使用工廠方法:
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
無(wú)參方法Executor是以…Async結(jié)尾同時(shí)將會(huì)使用ForkJoinPool.commonPool()(全局的,在JDK8中介紹的通用池),這適用于CompletableFuture類中的大多數(shù)的方法。runAsync()易于理解,注意它需要Runnable,因此它返回CompletableFuture<Void>作為Runnable不返回任何值。如果你需要處理異步操作并返回結(jié)果,使用Supplier<U>:
final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
//...long running...
return "42";
}
}, executor);
但是別忘了,Java 8里面還有l(wèi)ambdas表達(dá)式呢!
finalCompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
//...long running...
return "42";
}, executor);
或者:
final CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> longRunningTask(params), executor);
雖然這篇文章不是關(guān)于Lambda的,但是我會(huì)相當(dāng)頻繁地使用lambda表達(dá)式。
三、轉(zhuǎn)換和作用于CompletableFuture(thenApply)
我說(shuō)過(guò)CompletableFuture優(yōu)于Future但是你還不知道為什么嗎?簡(jiǎn)單說(shuō),因?yàn)镃ompletableFuture是一個(gè)原子也是一個(gè)因子。我說(shuō)的這句話沒(méi)什么幫助嗎?Scala和JavaScript都允許future完成時(shí)允許注冊(cè)異步回調(diào),直到它準(zhǔn)備好我們才要等待和阻止它。我們可以簡(jiǎn)單地說(shuō):運(yùn)行這個(gè)函數(shù)時(shí)就出現(xiàn)了結(jié)果。此外,我們可以疊加這些功能,把多個(gè)future組合在一起等。例如如果我們從String轉(zhuǎn)為Integer,我們可以轉(zhuǎn)為在不關(guān)聯(lián)的前提下從CompletableFuture到 CompletableFuture<Integer。這是通過(guò)thenApply()的方法:
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);<p></p>
<p>如前所述...Async版本提供對(duì)CompletableFuture的大多數(shù)操作,因此我將在后面的部分中跳過(guò)它們。記住,第一個(gè)方法將在future完成的相同線程中調(diào)用該方法,而剩下的兩個(gè)將在不同的線程池中異步地調(diào)用它。
讓我們來(lái)看看thenApply()的工作流程:</p>
<p><pre class="brush: java; gutter: true; first-line: 1; highlight: []; html-script: false">
CompletableFuture<String> f1 = //...
CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt);
CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);
</p>
或在一個(gè)聲明中:
CompletableFuture<Double> f3 =
f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);
這里,你會(huì)看到一個(gè)序列的轉(zhuǎn)換,從String到Integer再到Double。但最重要的是,這些轉(zhuǎn)換既不立即執(zhí)行也不停止。這些轉(zhuǎn)換既不立即執(zhí)行也不停止。他們只是記得,當(dāng)原始f1完成他們所執(zhí)行的程序。如果某些轉(zhuǎn)換非常耗時(shí),你可以提供你自己的Executor來(lái)異步地運(yùn)行他們。注意,此操作相當(dāng)于Scala中的一元map。
四、運(yùn)行完成的代碼(thenAccept/thenRun)
CompletableFuture<Void> thenAccept(Consumer<? super T> block);
CompletableFuture<Void> thenRun(Runnable action);
在future的管道里有兩種典型的“最終”階段方法。他們?cè)谀闶褂胒uture的值的時(shí)候做好準(zhǔn)備,當(dāng) thenAccept()提供最終的值時(shí),thenRun執(zhí)行 Runnable,這甚至沒(méi)有方法去計(jì)算值。例如:
future.thenAcceptAsync(dbl -> log.debug("Result: {}", dbl), executor);
log.debug("Continuing");
…Async變量也可用兩種方法,隱式和顯式執(zhí)行器,我不會(huì)過(guò)多強(qiáng)調(diào)這個(gè)方法。
thenAccept()/thenRun()方法并沒(méi)有發(fā)生阻塞(即使沒(méi)有明確的executor)。它們像一個(gè)事件偵聽(tīng)器/處理程序,你連接到一個(gè)future時(shí),這將執(zhí)行一段時(shí)間?!盋ontinuing”消息將立即出現(xiàn),盡管future甚至沒(méi)有完成。
五、單個(gè)CompletableFuture的錯(cuò)誤處理
到目前為止,我們只討論計(jì)算的結(jié)果。那么異常呢?我們可以異步地處理它們嗎?當(dāng)然!
CompletableFuture<String> safe =
future.exceptionally(ex -> "We have a problem: " + ex.getMessage());
exceptionally()接受一個(gè)函數(shù)時(shí),將調(diào)用原始future來(lái)拋出一個(gè)異常。我們會(huì)有機(jī)會(huì)將此異常轉(zhuǎn)換為和Future類型的兼容的一些值來(lái)進(jìn)行恢復(fù)。safe進(jìn)一步的轉(zhuǎn)換將不再產(chǎn)生一個(gè)異常而是從提供功能的函數(shù)返回一個(gè)String值。
一個(gè)更加靈活的方法是handle()接受一個(gè)函數(shù),它接收正確的結(jié)果或異常:
CompletableFuture<Integer> safe = future.handle((ok, ex) -> {
if (ok != null) {
return Integer.parseInt(ok);
} else {
log.warn("Problem", ex);
return -1;
}
});
handle()總是被調(diào)用,結(jié)果和異常都非空,這是個(gè)一站式全方位的策略。
六、一起結(jié)合兩個(gè)CompletableFuture
異步處理過(guò)程之一的CompletableFuture非常不錯(cuò)但是當(dāng)多個(gè)這樣的futures以各種方式組合在一起時(shí)確實(shí)顯示了它的強(qiáng)大。
七、結(jié)合(鏈接)這兩個(gè)futures(thenCompose())
有時(shí)你想運(yùn)行一些future的值(當(dāng)它準(zhǔn)備好了),但這個(gè)函數(shù)也返回了future。CompletableFuture足夠靈活地明白我們的函數(shù)結(jié)果現(xiàn)在應(yīng)該作為頂級(jí)的future,對(duì)比CompletableFuture<CompletableFuture>。方法 thenCompose()相當(dāng)于Scala的flatMap:
<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);
…Async變化也是可用的,在下面的事例中,仔細(xì)觀察thenApply()(map)和thenCompose()(flatMap)的類型和差異,當(dāng)應(yīng)用calculateRelevance()方法返回CompletableFuture:
CompletableFuture<Document> docFuture = //...
CompletableFuture<CompletableFuture<Double>> f =
docFuture.thenApply(this::calculateRelevance);
CompletableFuture<Double> relevanceFuture =
docFuture.thenCompose(this::calculateRelevance);
//...
private CompletableFuture<Double> calculateRelevance(Document doc) //...
thenCompose()是一個(gè)重要的方法允許構(gòu)建健壯的和異步的管道,沒(méi)有阻塞和等待的中間步驟。
八、兩個(gè)futures的轉(zhuǎn)換值(thenCombine())
當(dāng)thenCompose()用于鏈接一個(gè)future時(shí)依賴另一個(gè)thenCombine,當(dāng)他們都完成之后就結(jié)合兩個(gè)獨(dú)立的futures:
<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
…Async變量也是可用的,假設(shè)你有兩個(gè)CompletableFuture,一個(gè)加載Customer另一個(gè)加載最近的Shop。他們彼此完全獨(dú)立,但是當(dāng)他們完成時(shí),您想要使用它們的值來(lái)計(jì)算Route。這是一個(gè)可剝奪的例子:
CompletableFuture<Customer> customerFuture = loadCustomerDetails(123);
CompletableFuture<Shop> shopFuture = closestShop();
CompletableFuture<Route> routeFuture =
customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));
//...
private Route findRoute(Customer customer, Shop shop) //...
請(qǐng)注意,在Java 8中可以用(cust, shop) -> findRoute(cust, shop)簡(jiǎn)單地代替this::findRoute方法的引用:
customerFuture.thenCombine(shopFuture, this::findRoute);
你也知道,我們有customerFuture 和 shopFuture。那么routeFuture包裝它們?nèi)缓蟆暗却彼鼈兺瓿?。?dāng)他們準(zhǔn)備好了,它會(huì)運(yùn)行我們提供的函數(shù)來(lái)結(jié)合所有的結(jié)果(findRoute())。當(dāng)兩個(gè)基本的futures完成并且 findRoute()也完成時(shí),這樣routeFuture將會(huì)完成。
九、等待所有的 CompletableFutures 完成
如果不是產(chǎn)生新的CompletableFuture連接這兩個(gè)結(jié)果,我們只是希望當(dāng)完成時(shí)得到通知,我們可以使用thenAcceptBoth()/runAfterBoth()系列的方法,(…Async 變量也是可用的)。它們的工作方式與thenAccept() 和 thenRun()類似,但是是等待兩個(gè)futures而不是一個(gè):
<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block)
CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)
想象一下上面的例子,這不是產(chǎn)生新的 CompletableFuture,你只是想要立刻發(fā)送一些事件或刷新GUI。這可以很容易地實(shí)現(xiàn):thenAcceptBoth():
customerFuture.thenAcceptBoth(shopFuture, (cust, shop) -> {
final Route route = findRoute(cust, shop);
//refresh GUI with route
});
我希望我是錯(cuò)的,但也許有些人會(huì)問(wèn)自己一個(gè)問(wèn)題:為什么我不能簡(jiǎn)單地阻塞這兩個(gè)futures呢? 就像:
Future<Customer> customerFuture = loadCustomerDetails(123);
Future<Shop> shopFuture = closestShop();
findRoute(customerFuture.get(), shopFuture.get());
好了,你當(dāng)然可以這么做。但是最關(guān)鍵的一點(diǎn)是CompletableFuture是允許異步的,它是事件驅(qū)動(dòng)的編程模型而不是阻塞并急切地等待著結(jié)果。所以在功能上,上面兩部分代碼是等價(jià)的,但后者沒(méi)有必要占用一個(gè)線程來(lái)執(zhí)行。
十、等待第一個(gè) CompletableFuture 來(lái)完成任務(wù)
另一個(gè)有趣的事是CompletableFutureAPI可以等待第一個(gè)(與所有相反)完成的future。當(dāng)你有兩個(gè)相同類型任務(wù)的結(jié)果時(shí)就顯得非常方便,你只要關(guān)心響應(yīng)時(shí)間就行了,沒(méi)有哪個(gè)任務(wù)是優(yōu)先的。API方法(…Async變量也是可用的):
CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> block)
CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action)
作為一個(gè)例子,你有兩個(gè)系統(tǒng)可以集成。一個(gè)具有較小的平均響應(yīng)時(shí)間但是擁有高的標(biāo)準(zhǔn)差,另一個(gè)一般情況下較慢,但是更加容易預(yù)測(cè)。為了兩全其美(性能和可預(yù)測(cè)性)你可以在同一時(shí)間調(diào)用兩個(gè)系統(tǒng)并等著誰(shuí)先完成。通常這會(huì)是第一個(gè)系統(tǒng),但是在進(jìn)度變得緩慢時(shí),第二個(gè)系統(tǒng)就可以在可接受的時(shí)間內(nèi)完成:
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
fast.acceptEither(predictable, s -> {
System.out.println("Result: " + s);
});
s代表了從fetchFast()或是fetchPredictably()得到的String。我們不必知道也無(wú)需關(guān)心。
十一、完整地轉(zhuǎn)換第一個(gè)系統(tǒng)
applyToEither()算是 acceptEither()的前輩了。當(dāng)兩個(gè)futures快要完成時(shí),后者只是簡(jiǎn)單地調(diào)用一些代碼片段,applyToEither()將會(huì)返回一個(gè)新的future。當(dāng)這兩個(gè)最初的futures完成時(shí),新的future也會(huì)完成。API有點(diǎn)類似于(…Async 變量也是可用的):
這個(gè)額外的fn功能在第一個(gè)future被調(diào)用時(shí)能完成。我不確定這個(gè)專業(yè)化方法的目的是什么,畢竟一個(gè)人可以簡(jiǎn)單地使用:fast.applyToEither(predictable).thenApply(fn)。因?yàn)槲覀儓?jiān)持用這個(gè)API,但我們的確不需要額外功能的應(yīng)用程序,我會(huì)簡(jiǎn)單地使用Function.identity()占位符:
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
CompletableFuture<String> firstDone =
fast.applyToEither(predictable, Function.<String>identity());
第一個(gè)完成的future可以通過(guò)運(yùn)行。請(qǐng)注意,從客戶的角度來(lái)看,兩個(gè)futures實(shí)際上是在firstDone的后面而隱藏的??蛻舳酥皇堑却鴉uture來(lái)完成并且通過(guò)applyToEither()使得當(dāng)最先的兩個(gè)任務(wù)完成時(shí)通知客戶端。
十二、多種結(jié)合的CompletableFuture
我們現(xiàn)在知道如何等待兩個(gè)future來(lái)完成(使用thenCombine())并第一個(gè)完成(applyToEither())。但它可以擴(kuò)展到任意數(shù)量的futures嗎?的確,使用static輔助方法:
static CompletableFuture<Void< allOf(CompletableFuture<?<... cfs)
static CompletableFuture<Object< anyOf(CompletableFuture<?<... cfs)
allOf()當(dāng)所有的潛在futures完成時(shí),使用了一個(gè)futures數(shù)組并且返回一個(gè)future(等待所有的障礙)。另一方面anyOf()將會(huì)等待最快的潛在futures,請(qǐng)看一下返回futures的一般類型,這不是你所期望的嗎?我們會(huì)在接下來(lái)的文章中關(guān)注一下這個(gè)問(wèn)題。
總結(jié)
我們探索了整個(gè)CompletableFuture API。我確信這樣就能戰(zhàn)無(wú)不勝了,所以在下一篇文章中我們將研究另一個(gè)簡(jiǎn)單的web爬蟲(chóng)程序的實(shí)現(xiàn),使用CompletableFuture方法和Java 8 lambda表達(dá)式,我們也會(huì)看看CompletableFuture的
- Java8?CompletableFuture?runAsync學(xué)習(xí)總結(jié)submit()?execute()等
- Java?CompletableFuture實(shí)現(xiàn)多線程異步編排
- 詳解Java8?CompletableFuture的并行處理用法
- Java8 使用工廠方法supplyAsync創(chuàng)建CompletableFuture實(shí)例
- Java8 自定義CompletableFuture的原理解析
- Java8 CompletableFuture 異步執(zhí)行操作
- Java并發(fā) CompletableFuture異步編程的實(shí)現(xiàn)
- Java8新的異步編程方式CompletableFuture實(shí)現(xiàn)
- Java中的CompletableFuture原理與用法
相關(guān)文章
Java 實(shí)戰(zhàn)項(xiàng)目之倉(cāng)庫(kù)管理系統(tǒng)的實(shí)現(xiàn)流程
讀萬(wàn)卷書(shū)不如行萬(wàn)里路,只學(xué)書(shū)上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SSM+jsp+mysql+maven實(shí)現(xiàn)一個(gè)倉(cāng)庫(kù)管理系統(tǒng),大家可以在過(guò)程中查缺補(bǔ)漏,提升水平2021-11-11Springboot3整合Mybatis-plus3.5.3報(bào)錯(cuò)問(wèn)題解決
在日常學(xué)習(xí)springboot3相關(guān)的代碼時(shí),在使用 SpringBoot3 整合 MyBatisplus 時(shí)出現(xiàn)了一些問(wèn)題,花了不少時(shí)間處理,這篇文章主要介紹了Springboot3整合Mybatis-plus3.5.3報(bào)錯(cuò)問(wèn)題解決,需要的朋友可以參考下2023-11-11java動(dòng)態(tài)代理和cglib動(dòng)態(tài)代理示例分享
這篇文章主要介紹了java動(dòng)態(tài)代理和cglib動(dòng)態(tài)代理示例,JDK1.3之后,Java提供了動(dòng)態(tài)代理的技術(shù),允許開(kāi)發(fā)者在運(yùn)行期間創(chuàng)建接口的代理實(shí)例,下面我們使用示例學(xué)習(xí)一下2014-03-03MyBatis如何進(jìn)行雙重foreach循環(huán)
這篇文章主要介紹了MyBatis如何進(jìn)行雙重foreach循環(huán),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02