Kotlin協(xié)程之Flow基礎原理示例解析
引言
本文分析示例代碼如下:
launch(Dispatchers.Main) { flow { emit(1) emit(2) }.collect { delay(1000) withContext(Dispatchers.IO) { Log.d("liduo", "$it") } Log.d("liduo", "$it") } }
一.Flow的創(chuàng)建
在協(xié)程中,可以通過flow方法創(chuàng)建一個Flow對象,一個Flow對象代表一個冷流。其中參數(shù)block是FlowCollector的擴展方法,并且可掛起。代碼入下:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
FlowCollector是一個接口,用于收集上游的流發(fā)出的值,代碼如下:
public interface FlowCollector<in T> { // 可掛起,非線程安全 public suspend fun emit(value: T) }
調用flow方法,會返回一個Flow接口指向的對象,代碼如下:
public interface Flow<out T> { @InternalCoroutinesApi public suspend fun collect(collector: FlowCollector<T>) }
這里flow方法的返回對象是一個SafeFlow類型的對象。至此Flow就創(chuàng)建完畢了。
二.Flow的消費
在協(xié)程中,當需要消費流時,會調用collect方法,觸發(fā)流的消費,代碼如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> { override suspend fun emit(value: T) = action(value) })
這里的collect方法不是Flow接口定義的方法,而是Flow的擴展方法,內部創(chuàng)建了一個匿名的FlowCollector對象,并且把action封裝到了FlowCollector對象的emit方法中,最后將FlowCollector對象作為參數(shù)傳入到了另一個collect方法,這個collect方法才是Flow接口定義的方法。
1.SafeFlow類
根據(jù)上面的分析,F(xiàn)low對象最后返回的是一個SafeFlow類型的對象。因此,這里調用的另一個collect方法,就是SafeFlow類中的collect方法,代碼如下:
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() { override suspend fun collectSafely(collector: FlowCollector<T>) { collector.block() } }
SafeFlow類繼承自AbstractFlow類,類中重寫了collectSafely方法。因此調用的collect方法實際上是AbstractFlow類的方法。
2.AbstractFlow類
AbstractFlow類是一個抽象類,實現(xiàn)了Flow接口和CancellableFlow接口。實際上CancellableFlow接口繼承自Flow接口,因此AbstractFlow類只重寫了collect方法,代碼如下:
@FlowPreview public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> { // 核心方法 @InternalCoroutinesApi public final override suspend fun collect(collector: FlowCollector<T>) { // 創(chuàng)建SafeCollector對象,對collector進行包裹 val safeCollector = SafeCollector(collector, coroutineContext) try { // 調用collectSafely方法 collectSafely(safeCollector) } finally { // 釋放攔截的續(xù)體 safeCollector.releaseIntercepted() } } public abstract suspend fun collectSafely(collector: FlowCollector<T>) }
collect方法內部調用了collectSafely方法,collectSafely方法在SafeFlow中被重寫。collectSafely方法中會調用flow中的block,并提供一個SafeCollector類的環(huán)境。
3. SafeCollector類
當flow方法中的代碼在執(zhí)行時,會調用emit方法發(fā)射數(shù)據(jù),這時由于block執(zhí)行在SafeCollector類的環(huán)境中,因此調用的emit方法是SafeCollector類的方法。
SafeCollector類實現(xiàn)了FlowCollector接口并且繼承自ContinuationImpl類,代碼如下:
internal actual class SafeCollector<T> actual constructor( @JvmField internal actual val collector: FlowCollector<T>, @JvmField internal actual val collectContext: CoroutineContext ) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame { ... // 保存上下文中元素數(shù)量,用于檢查上下文是否變化 @JvmField internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 } // 保存上一次的上下文 private var lastEmissionContext: CoroutineContext? = null // 執(zhí)行結束后的續(xù)體 private var completion: Continuation<Unit>? = null // 協(xié)程上下文 override val context: CoroutineContext get() = completion?.context ?: EmptyCoroutineContext // 掛起的核心方法 override fun invokeSuspend(result: Result<Any?>): Any? { result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) } completion?.resumeWith(result as Result<Unit>) return COROUTINE_SUSPENDED } // 釋放攔截的續(xù)體 public actual override fun releaseIntercepted() { super.releaseIntercepted() } // 發(fā)射數(shù)據(jù) override suspend fun emit(value: T) { // 獲取當前suspend方法續(xù)體 return suspendCoroutineUninterceptedOrReturn sc@{ uCont -> try { // 調用重載的方法 emit(uCont, value) } catch (e: Throwable) { // 出現(xiàn)異常時,將異常封裝成上下文,保存到lastEmissionContext lastEmissionContext = DownstreamExceptionElement(e) // 拋出異常 throw e } } } // 重載的emit方法 private fun emit(uCont: Continuation<Unit>, value: T): Any? { // 從續(xù)體中獲取上下文 val currentContext = uCont.context // 保證當前協(xié)程的Job是active的 currentContext.ensureActive() // 獲取上次的上下文 val previousContext = lastEmissionContext // 如果前后上下文發(fā)生變化 if (previousContext !== currentContext) { // 檢查上下文是否發(fā)生異常 checkContext(currentContext, previousContext, value) } // 保存續(xù)體 completion = uCont // 調用emitFun方法,傳入collector,value,continuation return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>) } // 檢查上下文變化,防止并發(fā) private fun checkContext( currentContext: CoroutineContext, previousContext: CoroutineContext?, value: T ) { // 如果上次執(zhí)行過程中發(fā)生了異常 if (previousContext is DownstreamExceptionElement) { // 拋出異常 exceptionTransparencyViolated(previousContext, value) } // 檢查上下文是否發(fā)生變化,如果變化,則拋出異常 checkContext(currentContext) lastEmissionContext = currentContext } // 用于拋出異常 private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) { error(""" Flow exception transparency is violated: Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected. Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead. For a more detailed explanation, please refer to Flow documentation. """.trimIndent()) } }
emit方法最終會調用emitFun方法方法,代碼如下:
private val emitFun = FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
emitFun是一個lambda表達式,它將只有一個參數(shù)的emit方法轉換成三個參數(shù)的方法。emitFun方法在編譯時會被編譯器處理,反編譯后的代碼邏輯大致如下:
@Nullable public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) { InlineMarker.mark(0); // 核心執(zhí)行 Object var10000 = p1.emit(p2, continuation); InlineMarker.mark(2); InlineMarker.mark(1); return var10000; }
可以看到,emitFun方法內部會調用FlowCollector類對象的emit方法,同時傳入value和continuation作為參數(shù)。
而這個FlowCollector類對象就是一開始的collect方法封裝的匿名類對象,代碼如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> { override suspend fun emit(value: T) = action(value) })
調用它的emit方法,會直接調用action的invoke方法,并傳入發(fā)射的數(shù)據(jù),流在這里被最終消費。
通過上面的分析,可以知道消費的過程是在emit方法中被調用的,如果在消費的過程,沒有發(fā)生掛起,那么emit方法執(zhí)行完畢后,會繼續(xù)執(zhí)行flow方法里剩下的代碼,而如果在消費的過程中發(fā)生了掛起,情況會稍有不同。
4.消費過程中的掛起
如果消費過程中發(fā)生掛起,那么emit方法會返回一個COROUTINE_SUSPENDED對象,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED對象后,會掛起當前協(xié)程。代碼如下:
override suspend fun emit(value: T) { // 獲取當前suspend方法續(xù)體 return suspendCoroutineUninterceptedOrReturn sc@{ uCont -> try { // 調用重載的方法 emit(uCont, value) } catch (e: Throwable) { // 出現(xiàn)異常時,將異常封裝成上下文,保存到lastEmissionContext lastEmissionContext = DownstreamExceptionElement(e) // 拋出異常 throw e } } }
當消費過程執(zhí)行完畢時,會通過傳入的續(xù)體喚起外部協(xié)程恢復掛起狀態(tài)。根據(jù)emitFun可以知道,這里傳入的續(xù)體為this,也就是當前的SafeCollector類對象,代碼如下:
emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
恢復掛起需要調用續(xù)體的resumeWith方法,上面提到SafeCollector類繼承自ContinuationImpl類,SafeCollector類中沒有重寫resumeWith方法,而ContinuationImpl類中也沒有重寫resumeWith方法,因此實際調用的是ContinuationImpl類的父類BaseContinuationImpl類的resumeWith方法。如下圖所示:
在Kotlin協(xié)程:創(chuàng)建、啟動、掛起、恢復中提到過,調用BaseContinuationImpl類的resumeWith方法,內部會調用invokeSuspend方法,而SafeCollector類重寫了invokeSuspend方法,代碼如下:
override fun invokeSuspend(result: Result<Any?>): Any? { // 嘗試獲取異常 result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) } // 如果沒有異常,則恢復flow方法續(xù)體的執(zhí)行 completion?.resumeWith(result as Result<Unit>) // 返回掛起標識,這里掛起的是消費過程 return COROUTINE_SUSPENDED }
在invokeSuspend方法中,會調用resumeWith方法恢復生產(chǎn)過程——flow方法的執(zhí)行,同時掛起消費過程的執(zhí)行。全部過程如下圖所示:
以上就是Kotlin協(xié)程之Flow基礎原理示例解析的詳細內容,更多關于Kotlin協(xié)程Flow原理的資料請關注腳本之家其它相關文章!
相關文章
Android Studio下載更新Android SDK網(wǎng)絡異?;驘o法下載
這篇文章主要介紹了Android Studio下載更新Android SDK網(wǎng)絡異常或無法下載的相關資料,需要的朋友可以參考下2017-04-04老生常談Listview中onItemClick中的各個參數(shù)(推薦)
下面小編就為大家?guī)硪黄仙U凩istview中onItemClick中的各個參數(shù)(推薦)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-04-04Android 定位系統(tǒng)(GPS)開發(fā)詳解
GPS定位是智能手機上一個比較有意思的功能,LBS等服務都有效的利用了GPS定位功能,本文就跟大家分享下Android開發(fā)中的GPS定位知識2016-07-07