Java?CompletableFuture實現(xiàn)原理分析詳解
簡介
前面的一篇文章你知道Java8并發(fā)新特性CompletableFuture嗎?介紹了CompletableFuture的特性以及一些使用方法,今天我們主要來聊一聊CompletableFuture的回調(diào)功能以及異步工作原理是如何實現(xiàn)的。
CompletableFuture類結(jié)構(gòu)
1.CompletableFuture類結(jié)構(gòu)主要有兩個屬性
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { volatile Object result; // Either the result or boxed AltResult volatile Completion stack; // Top of Treiber stack of dependent actions ... }
- result:存儲CompletableFuture的返回,或者存儲異常對象AltResult。
- stack:是CompletableFuture.Completion對象,表示操作數(shù)棧棧頂,在進行CompletableFuture鏈式調(diào)用的過程中,所有鏈式調(diào)用的CompletableFuture任務(wù)都會被壓入該stack中,在任務(wù)調(diào)用的過程按后進先出的順序出棧執(zhí)行完所有任務(wù)。
2.stack屬性棧結(jié)構(gòu)
abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask { volatile Completion next; // Treiber stack link ... }
next:存儲下一個任務(wù)鏈式調(diào)用棧。
3.UniCompletion的內(nèi)部結(jié)構(gòu)
abstract static class UniCompletion<T,V> extends Completion { Executor executor; // executor to use (null if none) CompletableFuture<V> dep; // the dependent to complete CompletableFuture<T> src; // source for action ... }
UniCompletion繼承Completion類,包含以下幾個參數(shù):
- executor:異步任務(wù)執(zhí)行器,如果為空則有主線程執(zhí)行任務(wù)不進行異步執(zhí)行。
- dep:指向當(dāng)前任務(wù)構(gòu)建的CompletabueFuture
- src:指向源CompletableFuture任務(wù)
CompletableFuture回調(diào)原理
這里為了方便講解,我們用以下簡短的代碼來進行分析:
public static void main(String[] args) { CompletableFuture<String> baseFuture = CompletableFuture.completedFuture("Base Future"); log.info(baseFuture.thenApply((r) -> r + " Then Apply").join()); baseFuture.thenAccept((r) -> log.info(r)).thenAccept((Void) -> log.info("Void")); }
上面的代碼我們通過創(chuàng)建一個簡單的CompletableFuture
對象,再執(zhí)行baseFuture.thenApply()
調(diào)用后會進行一個入棧操作,如下圖baseFuture
引用的CompletableFuture
的stack
屬性將會指向baseFuture.thenApply()
結(jié)果返回的新CompletableFuture
對象,而新CompletableFuture
對象的src
屬性將指向來源CompletableFuture
即baseFuture
所引用的對象。
2.在上一步的基礎(chǔ)上再執(zhí)行下一行代碼,結(jié)果的引用關(guān)系圖如下圖:
在執(zhí)行完baseFuture.thenAccept()
的時候,thenAccept
返回的任務(wù)將被壓入棧頂,next
指向上一個代碼段的返回對象,在thenAccept
返回的新CompletableFuture
對象中在進行一次thenAccept
的調(diào)用,就再產(chǎn)生一個新的CompletableFuture
對象,dept
屬性就指向最新的CompletableFuture
對象。
thenApply實現(xiàn)源碼分析
public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } private <V> CompletableFuture<V> uniApplyStage( Executor e, Function<? super T,? extends V> f) { if (f == null) throw new NullPointerException(); // 創(chuàng)建一個新的CompletableFuture對象 CompletableFuture<V> d = new CompletableFuture<V>(); // e:如果是異步調(diào)用直接執(zhí)行代碼塊 // !d.uniApply:執(zhí)行任務(wù),如果返回false則任務(wù)未執(zhí)行需入棧 if (e != null || !d.uniApply(this, f, null)) { UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); // 創(chuàng)建出新的UniApply對象進行入棧 push(c); // 嘗試執(zhí)行任務(wù) c.tryFire(SYNC); } return d; } final <S> boolean uniApply(CompletableFuture<S> a, Function<? super S,? extends T> f, UniApply<S,T> c) { Object r; Throwable x; // 任務(wù)未完成結(jié)果為null直接返回false if (a == null || (r = a.result) == null || f == null) return false; // 驗證是否出現(xiàn)異常結(jié)果,如有則任務(wù)執(zhí)行結(jié)束 tryComplete: if (result == null) { if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { completeThrowable(x, r); break tryComplete; } r = null; } try { // 異步執(zhí)行任務(wù) if (c != null && !c.claim()) // 任務(wù)未執(zhí)行返回false return false; @SuppressWarnings("unchecked") S s = (S) r; // 任務(wù)執(zhí)行完成將結(jié)果寫入result completeValue(f.apply(s)); } catch (Throwable ex) { completeThrowable(ex); } } return true; }
以上代碼片段主要描述了CompletableFuture
在執(zhí)行任務(wù)時會創(chuàng)建出新的CompletableFuture
對象,使用新對象執(zhí)行任務(wù)并獲取結(jié)果使用CAS
寫入到result
屬性,如果任務(wù)未執(zhí)行將壓入棧頂,再重新嘗試任務(wù)執(zhí)行,在CompletableFuture
其他方法的調(diào)用也都大同小異,這里不在逐一分析,可自行打開源碼閱讀便于理解。
CompletableFuture異步原理
需要進行CompletableFuture異步調(diào)用則要使用Async
結(jié)尾的方法執(zhí)行任務(wù),這里我們就拿thenAcceptAsync()
的源碼進行分析。
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { return uniAcceptStage(asyncPool, action); } private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) { if (f == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); // 如果是異步任務(wù),這里的參數(shù)e不會為空,也就是會將任務(wù)執(zhí)行壓入棧頂 if (e != null || !d.uniAccept(this, f, null)) { UniAccept<T> c = new UniAccept<T>(e, d, this, f); push(c); // 重點還是這個入口 c.tryFire(SYNC); } return d; } static final class UniAccept<T> extends UniCompletion<T,Void> { Consumer<? super T> fn; UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn) { super(executor, dep, src); this.fn = fn; } final CompletableFuture<Void> tryFire(int mode) { CompletableFuture<Void> d; CompletableFuture<T> a; // dep為空即任務(wù)已被執(zhí)行過,直接返回null // uniAccept()結(jié)果為false,可能是任務(wù)執(zhí)行中未完成,也可能是由線程池中的其他任務(wù)執(zhí)行完成 if ((d = dep) == null || !d.uniAccept(a = src, fn, mode > 0 ? null : this)) return null; dep = null; src = null; fn = null; // 說明當(dāng)前線程執(zhí)行了該任務(wù),返回結(jié)果繼續(xù)執(zhí)行前一個任務(wù) return d.postFire(a, mode); } } final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) { if (a != null && a.stack != null) { // postComplete調(diào)用過來的,或者上一個任務(wù)執(zhí)行完成,清空棧數(shù)據(jù),否則調(diào)用postComplete完成任務(wù) if (mode < 0 || a.result == null) a.cleanStack(); else // 完成任務(wù)執(zhí)行并進行出棧 a.postComplete(); } if (result != null && stack != null) { if (mode < 0) // postComplete調(diào)用過來的任務(wù)已完成 return this; else // 完成任務(wù)執(zhí)行并進行出棧 postComplete(); } return null; }
CompletableFuture
進行異步主要是通過將任務(wù)壓入棧頂后tryFire
方法進行異步處理,如果任務(wù)未被執(zhí)行則會通過postFire
方法有線程池中的線程進行任務(wù)執(zhí)行,任務(wù)執(zhí)行結(jié)果再使用CAS
將結(jié)果返回到result
,其他線程即可得知任務(wù)是否被執(zhí)行過,如果當(dāng)前現(xiàn)場判斷當(dāng)前任務(wù)為被執(zhí)行,則調(diào)用postComplete()
執(zhí)行完成任務(wù)。
總結(jié)
CompletableFuture
通過異步回調(diào)的方式,解決了開發(fā)過程中異步調(diào)用獲取結(jié)果的難點。開發(fā)人員只需接觸到CompletableFuture
對象,以及CompletableFuture
任務(wù)的執(zhí)行結(jié)果,無需設(shè)計具體異步回調(diào)的實現(xiàn),并可通過自定義線程池進一步優(yōu)化任務(wù)的異步調(diào)用。
到此這篇關(guān)于Java CompletableFuture實現(xiàn)原理分析詳解的文章就介紹到這了,更多相關(guān)Java CompletableFuture內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MyBatisPlus 一對多、多對一、多對多的完美解決方案
這篇文章主要介紹了MyBatisPlus 一對多、多對一、多對多的完美解決方案,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-11-11Spring?Boot?如何通過ServletRequestHandledEvent事件實現(xiàn)接口請求的性能監(jiān)控
在Spring框架中,監(jiān)控接口請求的性能可以通過ServletRequestHandledEvent事件實現(xiàn),這篇文章給大家介紹Spring?Boot?如何通過ServletRequestHandledEvent事件實現(xiàn)接口請求的性能監(jiān)控,感興趣的朋友跟隨小編一起看看吧2024-08-08SpringBoot生成jar/war包的布局應(yīng)用
在 Spring Boot 中,"布局應(yīng)用"(Application Layout)指的是打包生成的可執(zhí)行 jar 或 war 文件中的內(nèi)容組織結(jié)構(gòu),本文給大家介紹了SpringBoot生成jar/war包的布局應(yīng)用,需要的朋友可以參考下2024-02-02Java并發(fā)編程之代碼實現(xiàn)兩玩家交換裝備
這篇文章主要介紹了Java并發(fā)編程之代碼實現(xiàn)兩玩家交換裝備,文中有非常詳細的代碼示例,對正在學(xué)習(xí)java的小伙伴們有一定的幫助,需要的朋友可以參考下2021-09-09Java實現(xiàn)微信公眾號自定義菜單的創(chuàng)建方法示例
這篇文章主要介紹了Java實現(xiàn)微信公眾號自定義菜單的創(chuàng)建方法,結(jié)合實例形式分析了java創(chuàng)建微信公眾號自定義菜單的具體步驟、實現(xiàn)方法及相關(guān)操作注意事項,需要的朋友可以參考下2019-10-10idea運行程序報錯java程序包org.junit不存在解決辦法
這篇文章主要給大家介紹了關(guān)于idea運行程序報錯java程序包org.junit不存在的解決辦法, 當(dāng)出現(xiàn)程序包org.junit不存在的問題時,可以通過使用適當(dāng)?shù)腏Unit版本、添加依賴或重新下載程序包等方式進行解決,需要的朋友可以參考下2024-02-02Java開發(fā)如何把數(shù)據(jù)庫里的未付款訂單改成已付款
這篇文章主要介紹了Java開發(fā)如何把數(shù)據(jù)庫里的未付款訂單改成已付款,先介紹MD5算法,簡單的來說,MD5能把任意大小、長度的數(shù)據(jù)轉(zhuǎn)換成固定長度的一串字符,實現(xiàn)思路非常簡單需要的朋友可以參考下2022-11-11