Kotlin協(xié)程之Flow基礎(chǔ)原理示例解析
引言
本文分析示例代碼如下:
launch(Dispatchers.Main) { flow { emit(1) emit(2) }.collect { delay(1000) withContext(Dispatchers.IO) { Log.d("liduo", "$it") } Log.d("liduo", "$it") } }
一.Flow的創(chuàng)建
在協(xié)程中,可以通過(guò)flow方法創(chuàng)建一個(gè)Flow對(duì)象,一個(gè)Flow對(duì)象代表一個(gè)冷流。其中參數(shù)block是FlowCollector的擴(kuò)展方法,并且可掛起。代碼入下:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
FlowCollector是一個(gè)接口,用于收集上游的流發(fā)出的值,代碼如下:
public interface FlowCollector<in T> { // 可掛起,非線程安全 public suspend fun emit(value: T) }
調(diào)用flow方法,會(huì)返回一個(gè)Flow接口指向的對(duì)象,代碼如下:
public interface Flow<out T> { @InternalCoroutinesApi public suspend fun collect(collector: FlowCollector<T>) }
這里flow方法的返回對(duì)象是一個(gè)SafeFlow類型的對(duì)象。至此Flow就創(chuàng)建完畢了。
二.Flow的消費(fèi)
在協(xié)程中,當(dāng)需要消費(fèi)流時(shí),會(huì)調(diào)用collect方法,觸發(fā)流的消費(fèi),代碼如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> { override suspend fun emit(value: T) = action(value) })
這里的collect方法不是Flow接口定義的方法,而是Flow的擴(kuò)展方法,內(nèi)部創(chuàng)建了一個(gè)匿名的FlowCollector對(duì)象,并且把a(bǔ)ction封裝到了FlowCollector對(duì)象的emit方法中,最后將FlowCollector對(duì)象作為參數(shù)傳入到了另一個(gè)collect方法,這個(gè)collect方法才是Flow接口定義的方法。
1.SafeFlow類
根據(jù)上面的分析,F(xiàn)low對(duì)象最后返回的是一個(gè)SafeFlow類型的對(duì)象。因此,這里調(diào)用的另一個(gè)collect方法,就是SafeFlow類中的collect方法,代碼如下:
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() { override suspend fun collectSafely(collector: FlowCollector<T>) { collector.block() } }
SafeFlow類繼承自AbstractFlow類,類中重寫(xiě)了collectSafely方法。因此調(diào)用的collect方法實(shí)際上是AbstractFlow類的方法。
2.AbstractFlow類
AbstractFlow類是一個(gè)抽象類,實(shí)現(xiàn)了Flow接口和CancellableFlow接口。實(shí)際上CancellableFlow接口繼承自Flow接口,因此AbstractFlow類只重寫(xiě)了collect方法,代碼如下:
@FlowPreview public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> { // 核心方法 @InternalCoroutinesApi public final override suspend fun collect(collector: FlowCollector<T>) { // 創(chuàng)建SafeCollector對(duì)象,對(duì)collector進(jìn)行包裹 val safeCollector = SafeCollector(collector, coroutineContext) try { // 調(diào)用collectSafely方法 collectSafely(safeCollector) } finally { // 釋放攔截的續(xù)體 safeCollector.releaseIntercepted() } } public abstract suspend fun collectSafely(collector: FlowCollector<T>) }
collect方法內(nèi)部調(diào)用了collectSafely方法,collectSafely方法在SafeFlow中被重寫(xiě)。collectSafely方法中會(huì)調(diào)用flow中的block,并提供一個(gè)SafeCollector類的環(huán)境。
3. SafeCollector類
當(dāng)flow方法中的代碼在執(zhí)行時(shí),會(huì)調(diào)用emit方法發(fā)射數(shù)據(jù),這時(shí)由于block執(zhí)行在SafeCollector類的環(huán)境中,因此調(diào)用的emit方法是SafeCollector類的方法。
SafeCollector類實(shí)現(xiàn)了FlowCollector接口并且繼承自ContinuationImpl類,代碼如下:
internal actual class SafeCollector<T> actual constructor( @JvmField internal actual val collector: FlowCollector<T>, @JvmField internal actual val collectContext: CoroutineContext ) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame { ... // 保存上下文中元素?cái)?shù)量,用于檢查上下文是否變化 @JvmField internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 } // 保存上一次的上下文 private var lastEmissionContext: CoroutineContext? = null // 執(zhí)行結(jié)束后的續(xù)體 private var completion: Continuation<Unit>? = null // 協(xié)程上下文 override val context: CoroutineContext get() = completion?.context ?: EmptyCoroutineContext // 掛起的核心方法 override fun invokeSuspend(result: Result<Any?>): Any? { result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) } completion?.resumeWith(result as Result<Unit>) return COROUTINE_SUSPENDED } // 釋放攔截的續(xù)體 public actual override fun releaseIntercepted() { super.releaseIntercepted() } // 發(fā)射數(shù)據(jù) override suspend fun emit(value: T) { // 獲取當(dāng)前suspend方法續(xù)體 return suspendCoroutineUninterceptedOrReturn sc@{ uCont -> try { // 調(diào)用重載的方法 emit(uCont, value) } catch (e: Throwable) { // 出現(xiàn)異常時(shí),將異常封裝成上下文,保存到lastEmissionContext lastEmissionContext = DownstreamExceptionElement(e) // 拋出異常 throw e } } } // 重載的emit方法 private fun emit(uCont: Continuation<Unit>, value: T): Any? { // 從續(xù)體中獲取上下文 val currentContext = uCont.context // 保證當(dāng)前協(xié)程的Job是active的 currentContext.ensureActive() // 獲取上次的上下文 val previousContext = lastEmissionContext // 如果前后上下文發(fā)生變化 if (previousContext !== currentContext) { // 檢查上下文是否發(fā)生異常 checkContext(currentContext, previousContext, value) } // 保存續(xù)體 completion = uCont // 調(diào)用emitFun方法,傳入collector,value,continuation return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>) } // 檢查上下文變化,防止并發(fā) private fun checkContext( currentContext: CoroutineContext, previousContext: CoroutineContext?, value: T ) { // 如果上次執(zhí)行過(guò)程中發(fā)生了異常 if (previousContext is DownstreamExceptionElement) { // 拋出異常 exceptionTransparencyViolated(previousContext, value) } // 檢查上下文是否發(fā)生變化,如果變化,則拋出異常 checkContext(currentContext) lastEmissionContext = currentContext } // 用于拋出異常 private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) { error(""" Flow exception transparency is violated: Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected. Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead. For a more detailed explanation, please refer to Flow documentation. """.trimIndent()) } }
emit方法最終會(huì)調(diào)用emitFun方法方法,代碼如下:
private val emitFun = FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
emitFun是一個(gè)lambda表達(dá)式,它將只有一個(gè)參數(shù)的emit方法轉(zhuǎn)換成三個(gè)參數(shù)的方法。emitFun方法在編譯時(shí)會(huì)被編譯器處理,反編譯后的代碼邏輯大致如下:
@Nullable public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) { InlineMarker.mark(0); // 核心執(zhí)行 Object var10000 = p1.emit(p2, continuation); InlineMarker.mark(2); InlineMarker.mark(1); return var10000; }
可以看到,emitFun方法內(nèi)部會(huì)調(diào)用FlowCollector類對(duì)象的emit方法,同時(shí)傳入value和continuation作為參數(shù)。
而這個(gè)FlowCollector類對(duì)象就是一開(kāi)始的collect方法封裝的匿名類對(duì)象,代碼如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> { override suspend fun emit(value: T) = action(value) })
調(diào)用它的emit方法,會(huì)直接調(diào)用action的invoke方法,并傳入發(fā)射的數(shù)據(jù),流在這里被最終消費(fèi)。
通過(guò)上面的分析,可以知道消費(fèi)的過(guò)程是在emit方法中被調(diào)用的,如果在消費(fèi)的過(guò)程,沒(méi)有發(fā)生掛起,那么emit方法執(zhí)行完畢后,會(huì)繼續(xù)執(zhí)行flow方法里剩下的代碼,而如果在消費(fèi)的過(guò)程中發(fā)生了掛起,情況會(huì)稍有不同。
4.消費(fèi)過(guò)程中的掛起
如果消費(fèi)過(guò)程中發(fā)生掛起,那么emit方法會(huì)返回一個(gè)COROUTINE_SUSPENDED對(duì)象,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED對(duì)象后,會(huì)掛起當(dāng)前協(xié)程。代碼如下:
override suspend fun emit(value: T) { // 獲取當(dāng)前suspend方法續(xù)體 return suspendCoroutineUninterceptedOrReturn sc@{ uCont -> try { // 調(diào)用重載的方法 emit(uCont, value) } catch (e: Throwable) { // 出現(xiàn)異常時(shí),將異常封裝成上下文,保存到lastEmissionContext lastEmissionContext = DownstreamExceptionElement(e) // 拋出異常 throw e } } }
當(dāng)消費(fèi)過(guò)程執(zhí)行完畢時(shí),會(huì)通過(guò)傳入的續(xù)體喚起外部協(xié)程恢復(fù)掛起狀態(tài)。根據(jù)emitFun可以知道,這里傳入的續(xù)體為this,也就是當(dāng)前的SafeCollector類對(duì)象,代碼如下:
emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
恢復(fù)掛起需要調(diào)用續(xù)體的resumeWith方法,上面提到SafeCollector類繼承自ContinuationImpl類,SafeCollector類中沒(méi)有重寫(xiě)resumeWith方法,而ContinuationImpl類中也沒(méi)有重寫(xiě)resumeWith方法,因此實(shí)際調(diào)用的是ContinuationImpl類的父類BaseContinuationImpl類的resumeWith方法。如下圖所示:
在Kotlin協(xié)程:創(chuàng)建、啟動(dòng)、掛起、恢復(fù)中提到過(guò),調(diào)用BaseContinuationImpl類的resumeWith方法,內(nèi)部會(huì)調(diào)用invokeSuspend方法,而SafeCollector類重寫(xiě)了invokeSuspend方法,代碼如下:
override fun invokeSuspend(result: Result<Any?>): Any? { // 嘗試獲取異常 result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) } // 如果沒(méi)有異常,則恢復(fù)flow方法續(xù)體的執(zhí)行 completion?.resumeWith(result as Result<Unit>) // 返回掛起標(biāo)識(shí),這里掛起的是消費(fèi)過(guò)程 return COROUTINE_SUSPENDED }
在invokeSuspend方法中,會(huì)調(diào)用resumeWith方法恢復(fù)生產(chǎn)過(guò)程——flow方法的執(zhí)行,同時(shí)掛起消費(fèi)過(guò)程的執(zhí)行。全部過(guò)程如下圖所示:
以上就是Kotlin協(xié)程之Flow基礎(chǔ)原理示例解析的詳細(xì)內(nèi)容,更多關(guān)于Kotlin協(xié)程Flow原理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Kotlin Flow常見(jiàn)場(chǎng)景下的使用實(shí)例
- Kotlin Flow封裝類SharedFlow StateFlow LiveData使用對(duì)比
- Kotlin Flow常用封裝類StateFlow使用詳解
- Kotlin Flow操作符及基本使用詳解
- Kotlin + Flow 實(shí)現(xiàn)Android 應(yīng)用初始化任務(wù)啟動(dòng)庫(kù)
- Android使用Kotlin和RxJava 2.×實(shí)現(xiàn)短信驗(yàn)證碼倒計(jì)時(shí)效果
- Kotlin使用flow實(shí)現(xiàn)倒計(jì)時(shí)功能(示例詳解)
相關(guān)文章
Android控制閃光燈的方法(打開(kāi)與關(guān)閉)
這篇文章主要介紹了Android控制閃光燈的方法,可實(shí)現(xiàn)閃光燈打開(kāi)與關(guān)閉的效果,涉及Android操作Camera拍照閃光燈的相關(guān)技巧,需要的朋友可以參考下2016-01-01Android學(xué)習(xí)項(xiàng)目之簡(jiǎn)易版微信為例(二)
這篇文章主要以簡(jiǎn)易版微信為例,實(shí)現(xiàn)簡(jiǎn)易版微信的登陸、注冊(cè)界面的編寫(xiě)與簡(jiǎn)單交互,感興趣的小伙伴們可以參考一下2016-06-06Android JNI 調(diào)用時(shí)緩存字段和方法ID示例
這篇文章主要介紹了Android JNI 調(diào)用時(shí)緩存字段和方法ID示例,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-07-07Android多渠道打包時(shí)獲取當(dāng)前渠道的方法
這篇文章主要介紹了Android多渠道打包時(shí)獲取當(dāng)前渠道的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-01-01Android Studio下載更新Android SDK網(wǎng)絡(luò)異常或無(wú)法下載
這篇文章主要介紹了Android Studio下載更新Android SDK網(wǎng)絡(luò)異?;驘o(wú)法下載的相關(guān)資料,需要的朋友可以參考下2017-04-04老生常談Listview中onItemClick中的各個(gè)參數(shù)(推薦)
下面小編就為大家?guī)?lái)一篇老生常談Listview中onItemClick中的各個(gè)參數(shù)(推薦)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-04-04Android 定位系統(tǒng)(GPS)開(kāi)發(fā)詳解
GPS定位是智能手機(jī)上一個(gè)比較有意思的功能,LBS等服務(wù)都有效的利用了GPS定位功能,本文就跟大家分享下Android開(kāi)發(fā)中的GPS定位知識(shí)2016-07-07Android自定義View繪制的方法及過(guò)程(二)
這篇文章主要解析了Android自定義View繪制的方法及過(guò)程,介紹了onSizeChanged、onDraw、onMeasure順序,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-03-03OpenHarmony如何調(diào)用電話服務(wù)API撥打電話
OpenHarmony3.1版本標(biāo)準(zhǔn)系統(tǒng)增加了通話相關(guān)的聯(lián)系人應(yīng)用,來(lái)電應(yīng)用等,在系統(tǒng)服務(wù)層面電話相關(guān)功能也比較完善,這篇文章主要介紹了OpenHarmony如何調(diào)用電話服務(wù)API撥打電話2022-11-11Android編程實(shí)現(xiàn)抽屜效果的方法詳解
這篇文章主要介紹了Android編程實(shí)現(xiàn)抽屜效果的方法,結(jié)合具體實(shí)例形式分析了Android實(shí)現(xiàn)抽屜效果的具體步驟與相關(guān)操作技巧,需要的朋友可以參考下2017-05-05