Java8通過CompletableFuture實現(xiàn)異步回調(diào)
前言:
java5為我們提供了Callable和Future,使我們可以很容易的完成異步任務(wù)結(jié)果的獲取,但是通過Future的get獲取異步任務(wù)結(jié)果會導(dǎo)致主線程的阻塞,這樣在某些場景下是非常消耗CPU資源的,進而Java8為我們提供了CompletableFuture,使我們可以輕松完成異步任務(wù)的回調(diào)。
1 什么是CompletableFuture?
CompletableFuture是Java 8 中新增的一個類,它是對Future接口的擴展。從下方的類繼承關(guān)系圖中我們看到其不僅實現(xiàn)了Future接口,還有CompletionStage接口,當Future需要顯示地完成時,可以使用CompletionStage接口去支持完成時觸發(fā)的函數(shù)和操作,當2個以上線程同時嘗試完成、異常完成、取消一個CompletableFuture時,只有一個能成功。
CompletableFuture主要作用就是簡化我們異步編程的復(fù)雜性,支持函數(shù)式編程,可以通過回調(diào)的方式處理計算結(jié)果。

2 為什么會有CompletableFuture ?
在java5中,JDK為我們提供了Callable和Future,使我們可以很容易的完成異步任務(wù)結(jié)果的獲取,但是通過Future的get獲取異步任務(wù)結(jié)果會導(dǎo)致主線程的阻塞,這樣在某些場景下是非常消耗CPU資源的,進而Java8為我們提供了CompletableFuture,使我們無需阻塞等待,而是通過回調(diào)的方式去處理結(jié)果,并且還支持流式處理、組合異步任務(wù)等操作。
如果不熟悉Callable和Future的,可以看小編之前更新的這篇文章Java從源碼看異步任務(wù)計算FutureTask
3 CompletableFuture 簡單使用
下面我們就CompletableFuture 的使用進行簡單分類:
創(chuàng)建任務(wù):
- supplyAsync/runAsync
異步回調(diào):
- thenApply/thenAccept/thenRun
- thenApplyAsync/thenAcceptAsync/thenRunAsync
- exceptionally
- handle/whenComplete
組合處理:
- thenCombine / thenAcceptBoth / runAfterBoth
- applyToEither / acceptEither / runAfterEither
- thenCompose
- allOf / anyOf
具體內(nèi)容請參照以下案例:
public static void main(String[] args) throws Exception {
// 1.帶返回值的異步任務(wù)(不指定線程池,默認ForkJoinPool.commonPool(),單核ThreadPerTaskExecutor)
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
return 1 + 1;
});
System.out.println("cf1 result: " + cf1.get());
// 2.無返回值的異步任務(wù)(不指定線程池,默認ForkJoinPool.commonPool(),單核ThreadPerTaskExecutor)
CompletableFuture cf2 = CompletableFuture.runAsync(() -> {
int a = 1 + 1;
});
System.out.println("cf2 result: " + cf2.get());
// 3.指定線程池的帶返回值的異步任務(wù),runAsync同理
CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> {
return 1 + 1;
}, Executors.newCachedThreadPool());
System.out.println("cf3 result: " + cf3.get());
// 4.回調(diào),任務(wù)執(zhí)行完成后執(zhí)行的動作
CompletableFuture<Integer> cf4 = cf1.thenApply((result) -> {
System.out.println("cf4回調(diào)拿到cf1的結(jié)果 result : " + result);
return result + 1;
});
System.out.println("cf4 result: " + cf4.get());
// 5.異步回調(diào)(將回調(diào)任務(wù)提交到線程池),任務(wù)執(zhí)行完成后執(zhí)行的動作后異步執(zhí)行
CompletableFuture<Integer> cf5 = cf1.thenApplyAsync((result) -> {
System.out.println("cf5回調(diào)拿到cf1的結(jié)果 result : " + result);
return result + 1;
});
System.out.println("cf5 result: " + cf5.get());
// 6.回調(diào)(同thenApply但無返回結(jié)果),任務(wù)執(zhí)行完成后執(zhí)行的動作
CompletableFuture cf6 = cf1.thenAccept((result) -> {
System.out.println("cf6回調(diào)拿到cf1的結(jié)果 result : " + result);
});
System.out.println("cf6 result: " + cf6.get());
// 7.回調(diào)(同thenAccept但無入?yún)?,任務(wù)執(zhí)行完成后執(zhí)行的動作
CompletableFuture cf7 = cf1.thenRun(() -> {
});
System.out.println("cf7 result: " + cf7.get());
// 8.異?;卣{(diào),任務(wù)執(zhí)行出現(xiàn)異常后執(zhí)行的動作
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("出現(xiàn)異常");
});
CompletableFuture<Integer> cf8 = cf.exceptionally((result) -> {
return -1;
});
System.out.println("cf8 result: " + cf8.get());
// 9.當某個任務(wù)執(zhí)行完成后執(zhí)行的回調(diào)方法,會將執(zhí)行結(jié)果或者執(zhí)行期間拋出的異常傳遞給回調(diào)方法
// 如果是正常執(zhí)行則異常為null,回調(diào)方法對應(yīng)的CompletableFuture的result和該任務(wù)一致;
// 如果該任務(wù)正常執(zhí)行,則get方法返回執(zhí)行結(jié)果,如果是執(zhí)行異常,則get方法拋出異常。
CompletableFuture<Integer> cf9 = cf1.handle((a, b) -> {
if (b != null) {
b.printStackTrace();
}
return a;
});
System.out.println("cf9 result: " + cf9.get());
// 10 與handle類似,無返回值
try {
CompletableFuture<Integer> cf10 = cf.whenComplete((a, b) -> {
if (b != null) {
b.printStackTrace();
}
});
System.out.println("cf10 result: " + cf10.get());
} catch (Exception e) {
System.out.println("cf10 出現(xiàn)異常!??!");
}
// 11 組合處理(兩個都完成,然后執(zhí)行)有入?yún)?,有返回?
CompletableFuture<Integer> cf11 = cf1.thenCombine(cf3, (r1, r2) -> {
return r1 + r2;
});
System.out.println("cf11 result: " + cf11.get());
// 12 組合處理(兩個都完成,然后執(zhí)行)有入?yún)?,無返回值
CompletableFuture cf12 = cf1.thenAcceptBoth(cf3, (r1, r2) -> {
});
System.out.println("cf12 result: " + cf12.get());
// 13 組合處理(兩個都完成,然后執(zhí)行)無入?yún)ⅲ瑹o返回值
CompletableFuture cf13 = cf1.runAfterBoth(cf3, () -> {
});
System.out.println("cf13 result: " + cf13.get());
// 14 組合處理(有一個完成,然后執(zhí)行)有入?yún)ⅲ蟹祷刂?
CompletableFuture<Integer> cf14 = cf1.applyToEither(cf3, (r) -> {
return r;
});
System.out.println("cf14 result: " + cf14.get());
// 15 組合處理(有一個完成,然后執(zhí)行)有入?yún)?,無返回值
CompletableFuture cf15 = cf1.acceptEither(cf3, (r) -> {
});
System.out.println("cf15 result: " + cf15.get());
// 16 組合處理(有一個完成,然后執(zhí)行)無入?yún)ⅲ瑹o返回值
CompletableFuture cf16 = cf1.runAfterEither(cf3, () -> {
});
System.out.println("cf16 result: " + cf16.get());
// 17 方法執(zhí)行后返回一個新的CompletableFuture
CompletableFuture<Integer> cf17 = cf1.thenCompose((r) -> {
return CompletableFuture.supplyAsync(() -> {
return 1 + 1;
});
});
System.out.println("cf17 result: " + cf17.get());
// 18 多個任務(wù)都執(zhí)行成功才會繼續(xù)執(zhí)行
CompletableFuture.allOf(cf1,cf2,cf3).whenComplete((r, t) -> {
System.out.println(r);
});
// 18 多個任務(wù)任意一個執(zhí)行成功就會繼續(xù)執(zhí)行
CompletableFuture.anyOf(cf1,cf2,cf3).whenComplete((r, t) -> {
System.out.println(r);
});
}4 CompletableFuture 源碼分析
首先我們可以從注釋中看到,它對CompletionStage、Future接口擴展的一些描述,這些也是它的一些重點。
除了直接操作狀態(tài)和結(jié)果的相關(guān)方法外,CompletableFuture還實現(xiàn)了CompletionStage接口的如下策略:
- (1)為非異步方法的依賴完成提供的操作,可以由完成當前
CompletableFuture的線程執(zhí)行,也可以由完成方法的任何其他調(diào)用方執(zhí)行。 - (2)所有沒有顯式Executor參數(shù)的異步方法都使用
ForkJoinPool.commonPool()執(zhí)行(除非它不支持至少兩個并行級別,在這種情況下,將創(chuàng)建一個新線程來運行每個任務(wù))。為了簡化監(jiān)視、調(diào)試和跟蹤,所有生成的異步任務(wù)都是CompletableFuture的實例,異步完成任務(wù)。
不了解ForkJoinPool的可以閱讀小編之前更新的這篇文章一文帶你了解Java中的ForkJoin。
- (3)所有
CompletionStage方法都是獨立于其他公共方法實現(xiàn)的,因此一個方法的行為不會受到子類中其他方法重寫的影響。
CompletableFuture實現(xiàn)了Future接口的如下策略:
- 因為(與FutureTask不同)這個類對導(dǎo)致它完成的計算沒有直接控制權(quán),所以取消被視為另一種形式的異常完成,所以cancel操作被視為是另一種異常完成形式(new CancellationException()具有相同的效果。)。方法
isCompletedExceptionally()可以用來確定一個CompletableFuture是否以任何異常的方式完成。 - 如果異常完成時出現(xiàn)
CompletionException,方法get()和get(long,TimeUnit)會拋出一個ExecutionException,其原因與相應(yīng)CompletionException中的原因相同。為了簡化在大多數(shù)上下文中的使用,該類還定義了join()和getNow()方法,在這些情況下直接拋出CompletionException。
4.1 創(chuàng)建異步任務(wù)
我們先看一下CompletableFuture是如何創(chuàng)建異步任務(wù)的,我們可以看到起創(chuàng)建異步任務(wù)的核心實現(xiàn)是兩個入?yún)?,一個入?yún)⑹荅xecutor,另一個入?yún)⑹荢upplier(函數(shù)式編程接口)。其中也提供了一個入?yún)⒌闹剌d,一個入?yún)⒌闹剌d方法會獲取默認的Executor,當系統(tǒng)是單核的會使用ThreadPerTaskExecutor,多核時使用ForkJoinPool.commonPool()。
注意:這里默認ForkJoinPool.commonPool()線程池,如果所有異步任務(wù)都使用該線程池話,出現(xiàn)問題不容易定位,如果長時間占用該線程池可能影響其他業(yè)務(wù)的正常操作,stream的并行流也是使用的該線程池。
其中還封裝了靜態(tài)內(nèi)部類AsyncSupply,該類代表這個異步任務(wù),實現(xiàn)了Runnable,重寫了run方法。
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
/**
* 靜態(tài)內(nèi)部類,繼承了ForkJoinTask<Void>、實現(xiàn)了Runnable、AsynchronousCompletionTask
*/
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep; Supplier<T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return true; }
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
}Supplier類是一個函數(shù)式的接口,@FunctionalInterface注解就是函數(shù)式編程的標記。
package java.util.function;
@FunctionalInterface
public interface Supplier<T> {
T get();
}4.2 異步任務(wù)回調(diào)
異步任務(wù)回調(diào),我們以thenApply/thenApplyAsync為例來看一下其實現(xiàn)原理,方法名含有Async的會傳入asyncPool。uniApplyStage方法通過判斷e是否有值,來區(qū)分是從哪個方法進來的。thenApply不會傳入 Executor,它優(yōu)先讓當前線程來執(zhí)行后續(xù) stage 的任務(wù)。
- 當發(fā)現(xiàn)前一個 stage 已經(jīng)執(zhí)行完畢時,直接讓當前線程來執(zhí)行后續(xù) stage 的 task。
- 當發(fā)現(xiàn)前一個 stage 還沒執(zhí)行完畢時,則把當前 stage 包裝成一個 UniApply 對象,放到前一個 stage 的棧中。執(zhí)行前一個 stage 的線程,執(zhí)行完畢后,接著執(zhí)行后續(xù) stage 的 task。
thenApplyAsync會傳入一個 Executor,它總是讓 Executor 線程池里面的線程來執(zhí)行后續(xù) stage 的任務(wù)。
- 把當前 stage 包裝成一個 UniApply 對象,放到前一個 stage 的棧中,直接讓 Executor 來執(zhí)行。
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
// Async直接進入,不是Async執(zhí)行uniApply嘗試獲取結(jié)果
if (e != null || !d.uniApply(this, f, null)) {
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}
final <S> boolean uniApply(CompletableFuture<S> a,
Function<? super S,? extends T> f,
UniApply<S,T> c) {
Object r; Throwable x;
// 判斷當前CompletableFuture是否已完成,如果沒完成則返回false;如果完成了則執(zhí)行下面的邏輯。
if (a == null || (r = a.result) == null || f == null)
return false;
tryComplete: if (result == null) {
// 判斷任務(wù)結(jié)果是否是AltResult類型
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
// 判斷當前任務(wù)是否可以執(zhí)行
if (c != null && !c.claim())
return false;
// 獲取任務(wù)結(jié)果
@SuppressWarnings("unchecked") S s = (S) r;
// 執(zhí)行
completeValue(f.apply(s));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
static final class UniApply<T,V> extends UniCompletion<T,V> {
Function<? super T,? extends V> fn;
UniApply(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src,
Function<? super T,? extends V> fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniApply(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
}
final void push(UniCompletion<?,?> c) {
if (c != null) {
while (result == null && !tryPushStack(c))
lazySetNext(c, null); // clear on failure
}
}
final boolean completeValue(T t) {
return UNSAFE.compareAndSwapObject(this, RESULT, null,
(t == null) ? NIL : t);
}4.3 異步任務(wù)組合
我們再thenCombine方法為例看一下CompletableFuture是如何處理組合任務(wù)的,我們可以看到thenCombine的源碼與thenApply的源碼基本上是一直的,只不過組合的時候不僅僅是判斷一個,需要集合具體場景,判斷多個CompletableFuture。
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
private <U,V> CompletableFuture<V> biApplyStage(
Executor e, CompletionStage<U> o,
BiFunction<? super T,? super U,? extends V> f) {
CompletableFuture<U> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.biApply(this, b, f, null)) {
BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
bipush(b, c);
c.tryFire(SYNC);
}
return d;
}
final <R,S> boolean biApply(CompletableFuture<R> a,
CompletableFuture<S> b,
BiFunction<? super R,? super S,? extends T> f,
BiApply<R,S,T> c) {
Object r, s; Throwable x;
// 此處不止要判斷a還得判斷b
if (a == null || (r = a.result) == null ||
b == null || (s = b.result) == null || f == null)
return false;
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
// 這里不止判斷a的結(jié)果r還要判斷b的結(jié)果s
if (s instanceof AltResult) {
if ((x = ((AltResult)s).ex) != null) {
completeThrowable(x, s);
break tryComplete;
}
s = null;
}
// 最后將rr, ss傳入
try {
if (c != null && !c.claim())
return false;
@SuppressWarnings("unchecked") R rr = (R) r;
@SuppressWarnings("unchecked") S ss = (S) s;
completeValue(f.apply(rr, ss));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
static final class BiApply<T,U,V> extends BiCompletion<T,U,V> {
BiFunction<? super T,? super U,? extends V> fn;
BiApply(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src, CompletableFuture<U> snd,
BiFunction<? super T,? super U,? extends V> fn) {
super(executor, dep, src, snd); this.fn = fn;
}
// tryFire方法也同樣的多可個b
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d;
CompletableFuture<T> a;
CompletableFuture<U> b;
if ((d = dep) == null ||
!d.biApply(a = src, b = snd, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; snd = null; fn = null;
return d.postFire(a, b, mode);
}
}到此這篇關(guān)于Java8通過CompletableFuture實現(xiàn)異步回調(diào)的文章就介紹到這了,更多相關(guān)Java8異步回調(diào)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中ByteBuffer的allocate方法 和allocateDirect方法的區(qū)別和選用原則解析
在Java中,ByteBuffer是java.nio包中的一個類,用于處理字節(jié)數(shù)據(jù),ByteBuffer提供了兩種方式來分配內(nèi)存:allocate和allocateDirect,這篇文章主要介紹了Java中ByteBuffer的allocate方法 和allocateDirect方法的區(qū)別和選用原則 ,需要的朋友可以參考下2023-12-12
基于SpringBoot + Android實現(xiàn)登錄功能
在移動互聯(lián)網(wǎng)的今天,許多應(yīng)用需要通過移動端實現(xiàn)與服務(wù)器的交互功能,其中登錄是最常見且基礎(chǔ)的一種功能,本篇博客將詳細介紹如何使用 Spring Boot 和 Android 實現(xiàn)一個完整的登錄功能,需要的朋友可以參考下2024-11-11
使用Java開發(fā)實現(xiàn)OAuth安全認證的應(yīng)用
這篇文章主要介紹了使用Java開發(fā)實現(xiàn)OAuth安全認證的應(yīng)用的方法,OAuth安全認證經(jīng)常出現(xiàn)于社交網(wǎng)絡(luò)API應(yīng)用的相關(guān)開發(fā)中,需要的朋友可以參考下2015-11-11
ActiveMQ結(jié)合Spring收發(fā)消息的示例代碼
這篇文章主要介紹了ActiveMQ結(jié)合Spring收發(fā)消息的示例代碼,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-10-10
mybatis攔截器實現(xiàn)數(shù)據(jù)庫數(shù)據(jù)權(quán)限隔離方式
通過Mybatis攔截器,在執(zhí)行SQL前添加條件實現(xiàn)數(shù)據(jù)權(quán)限隔離,特別是對于存在用戶ID區(qū)分的表,攔截器會自動添加如user_id=#{userId}的條件,確保SQL在執(zhí)行時只能操作指定用戶的數(shù)據(jù),此方法主要應(yīng)用于Mybatis的四個階段2024-11-11
Java編程一維數(shù)組轉(zhuǎn)換成二維數(shù)組實例代碼
這篇文章主要介紹了Java編程一維數(shù)組轉(zhuǎn)換成二維數(shù)組,分享了相關(guān)代碼示例,小編覺得還是挺不錯的,具有一定借鑒價值,需要的朋友可以參考下2018-01-01

