欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kotlin協(xié)程之Flow基礎(chǔ)原理示例解析

 更新時(shí)間:2022年09月01日 09:44:38   作者:李蕭蝶  
這篇文章主要為大家介紹了Kotlin協(xié)程之Flow基礎(chǔ)原理示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

引言

本文分析示例代碼如下:

launch(Dispatchers.Main) {
    flow {
        emit(1)
        emit(2)
    }.collect {
        delay(1000)

        withContext(Dispatchers.IO) {
            Log.d("liduo", "$it")
        }

        Log.d("liduo", "$it")
    }
}

一.Flow的創(chuàng)建

在協(xié)程中,可以通過(guò)flow方法創(chuàng)建一個(gè)Flow對(duì)象,一個(gè)Flow對(duì)象代表一個(gè)冷流。其中參數(shù)block是FlowCollector的擴(kuò)展方法,并且可掛起。代碼入下:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

FlowCollector是一個(gè)接口,用于收集上游的流發(fā)出的值,代碼如下:

public interface FlowCollector<in T> {
    // 可掛起,非線程安全
    public suspend fun emit(value: T)
}

調(diào)用flow方法,會(huì)返回一個(gè)Flow接口指向的對(duì)象,代碼如下:

public interface Flow<out T> {
   
    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}

這里flow方法的返回對(duì)象是一個(gè)SafeFlow類型的對(duì)象。至此Flow就創(chuàng)建完畢了。

二.Flow的消費(fèi)

在協(xié)程中,當(dāng)需要消費(fèi)流時(shí),會(huì)調(diào)用collect方法,觸發(fā)流的消費(fèi),代碼如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

這里的collect方法不是Flow接口定義的方法,而是Flow的擴(kuò)展方法,內(nèi)部創(chuàng)建了一個(gè)匿名的FlowCollector對(duì)象,并且把a(bǔ)ction封裝到了FlowCollector對(duì)象的emit方法中,最后將FlowCollector對(duì)象作為參數(shù)傳入到了另一個(gè)collect方法,這個(gè)collect方法才是Flow接口定義的方法。

1.SafeFlow類

根據(jù)上面的分析,F(xiàn)low對(duì)象最后返回的是一個(gè)SafeFlow類型的對(duì)象。因此,這里調(diào)用的另一個(gè)collect方法,就是SafeFlow類中的collect方法,代碼如下:

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

SafeFlow類繼承自AbstractFlow類,類中重寫(xiě)了collectSafely方法。因此調(diào)用的collect方法實(shí)際上是AbstractFlow類的方法。

2.AbstractFlow類

AbstractFlow類是一個(gè)抽象類,實(shí)現(xiàn)了Flow接口和CancellableFlow接口。實(shí)際上CancellableFlow接口繼承自Flow接口,因此AbstractFlow類只重寫(xiě)了collect方法,代碼如下:

@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

     // 核心方法 
    @InternalCoroutinesApi
    public final override suspend fun collect(collector: FlowCollector<T>) {
        // 創(chuàng)建SafeCollector對(duì)象,對(duì)collector進(jìn)行包裹
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            // 調(diào)用collectSafely方法
            collectSafely(safeCollector)
        } finally {
            // 釋放攔截的續(xù)體
            safeCollector.releaseIntercepted()
        }
    }
    
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

collect方法內(nèi)部調(diào)用了collectSafely方法,collectSafely方法在SafeFlow中被重寫(xiě)。collectSafely方法中會(huì)調(diào)用flow中的block,并提供一個(gè)SafeCollector類的環(huán)境。

3. SafeCollector類

當(dāng)flow方法中的代碼在執(zhí)行時(shí),會(huì)調(diào)用emit方法發(fā)射數(shù)據(jù),這時(shí)由于block執(zhí)行在SafeCollector類的環(huán)境中,因此調(diào)用的emit方法是SafeCollector類的方法。

SafeCollector類實(shí)現(xiàn)了FlowCollector接口并且繼承自ContinuationImpl類,代碼如下:

internal actual class SafeCollector<T> actual constructor(
    @JvmField internal actual val collector: FlowCollector<T>,
    @JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
    
    ...
    // 保存上下文中元素?cái)?shù)量,用于檢查上下文是否變化
    @JvmField
    internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
    // 保存上一次的上下文
    private var lastEmissionContext: CoroutineContext? = null
    // 執(zhí)行結(jié)束后的續(xù)體
    private var completion: Continuation<Unit>? = null

    // 協(xié)程上下文
    override val context: CoroutineContext
        get() = completion?.context ?: EmptyCoroutineContext

    // 掛起的核心方法
    override fun invokeSuspend(result: Result<Any?>): Any? {
        result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
        completion?.resumeWith(result as Result<Unit>)
        return COROUTINE_SUSPENDED
    }

    // 釋放攔截的續(xù)體
    public actual override fun releaseIntercepted() {
        super.releaseIntercepted()
    }

    // 發(fā)射數(shù)據(jù)
    override suspend fun emit(value: T) {
        // 獲取當(dāng)前suspend方法續(xù)體
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                // 調(diào)用重載的方法
                emit(uCont, value)
            } catch (e: Throwable) {
                 // 出現(xiàn)異常時(shí),將異常封裝成上下文,保存到lastEmissionContext
                lastEmissionContext = DownstreamExceptionElement(e)
                // 拋出異常
                throw e
            }
        }
    }

    // 重載的emit方法
    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        // 從續(xù)體中獲取上下文
        val currentContext = uCont.context
        // 保證當(dāng)前協(xié)程的Job是active的
        currentContext.ensureActive()
        // 獲取上次的上下文
        val previousContext = lastEmissionContext
        // 如果前后上下文發(fā)生變化
        if (previousContext !== currentContext) {
            // 檢查上下文是否發(fā)生異常
            checkContext(currentContext, previousContext, value)
        }
        // 保存續(xù)體
        completion = uCont
        // 調(diào)用emitFun方法,傳入collector,value,continuation
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    }

    // 檢查上下文變化,防止并發(fā)
    private fun checkContext(
        currentContext: CoroutineContext,
        previousContext: CoroutineContext?,
        value: T
    ) {
        // 如果上次執(zhí)行過(guò)程中發(fā)生了異常
        if (previousContext is DownstreamExceptionElement) {
            // 拋出異常
            exceptionTransparencyViolated(previousContext, value)
        }
        // 檢查上下文是否發(fā)生變化,如果變化,則拋出異常
        checkContext(currentContext)
        lastEmissionContext = currentContext
    }
    
    // 用于拋出異常
    private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
        error("""
            Flow exception transparency is violated:
                Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
                Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
                For a more detailed explanation, please refer to Flow documentation.
            """.trimIndent())
    }
}

emit方法最終會(huì)調(diào)用emitFun方法方法,代碼如下:

private val emitFun =
    FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>

emitFun是一個(gè)lambda表達(dá)式,它將只有一個(gè)參數(shù)的emit方法轉(zhuǎn)換成三個(gè)參數(shù)的方法。emitFun方法在編譯時(shí)會(huì)被編譯器處理,反編譯后的代碼邏輯大致如下:

@Nullable
public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
   InlineMarker.mark(0);
   // 核心執(zhí)行
   Object var10000 = p1.emit(p2, continuation);
   InlineMarker.mark(2);
   InlineMarker.mark(1);
   return var10000;
}

可以看到,emitFun方法內(nèi)部會(huì)調(diào)用FlowCollector類對(duì)象的emit方法,同時(shí)傳入value和continuation作為參數(shù)。

而這個(gè)FlowCollector類對(duì)象就是一開(kāi)始的collect方法封裝的匿名類對(duì)象,代碼如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

調(diào)用它的emit方法,會(huì)直接調(diào)用action的invoke方法,并傳入發(fā)射的數(shù)據(jù),流在這里被最終消費(fèi)。

通過(guò)上面的分析,可以知道消費(fèi)的過(guò)程是在emit方法中被調(diào)用的,如果在消費(fèi)的過(guò)程,沒(méi)有發(fā)生掛起,那么emit方法執(zhí)行完畢后,會(huì)繼續(xù)執(zhí)行flow方法里剩下的代碼,而如果在消費(fèi)的過(guò)程中發(fā)生了掛起,情況會(huì)稍有不同。

4.消費(fèi)過(guò)程中的掛起

如果消費(fèi)過(guò)程中發(fā)生掛起,那么emit方法會(huì)返回一個(gè)COROUTINE_SUSPENDED對(duì)象,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED對(duì)象后,會(huì)掛起當(dāng)前協(xié)程。代碼如下:

override suspend fun emit(value: T) {
    // 獲取當(dāng)前suspend方法續(xù)體
    return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
        try {
            // 調(diào)用重載的方法
            emit(uCont, value)
        } catch (e: Throwable) {
            // 出現(xiàn)異常時(shí),將異常封裝成上下文,保存到lastEmissionContext
            lastEmissionContext = DownstreamExceptionElement(e)
            // 拋出異常
            throw e
        }
    }
}

當(dāng)消費(fèi)過(guò)程執(zhí)行完畢時(shí),會(huì)通過(guò)傳入的續(xù)體喚起外部協(xié)程恢復(fù)掛起狀態(tài)。根據(jù)emitFun可以知道,這里傳入的續(xù)體為this,也就是當(dāng)前的SafeCollector類對(duì)象,代碼如下:

emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)

恢復(fù)掛起需要調(diào)用續(xù)體的resumeWith方法,上面提到SafeCollector類繼承自ContinuationImpl類,SafeCollector類中沒(méi)有重寫(xiě)resumeWith方法,而ContinuationImpl類中也沒(méi)有重寫(xiě)resumeWith方法,因此實(shí)際調(diào)用的是ContinuationImpl類的父類BaseContinuationImpl類的resumeWith方法。如下圖所示:

Kotlin協(xié)程:創(chuàng)建、啟動(dòng)、掛起、恢復(fù)中提到過(guò),調(diào)用BaseContinuationImpl類的resumeWith方法,內(nèi)部會(huì)調(diào)用invokeSuspend方法,而SafeCollector類重寫(xiě)了invokeSuspend方法,代碼如下:

override fun invokeSuspend(result: Result<Any?>): Any? {
    // 嘗試獲取異常
    result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
    // 如果沒(méi)有異常,則恢復(fù)flow方法續(xù)體的執(zhí)行
    completion?.resumeWith(result as Result<Unit>)
    // 返回掛起標(biāo)識(shí),這里掛起的是消費(fèi)過(guò)程
    return COROUTINE_SUSPENDED
}

在invokeSuspend方法中,會(huì)調(diào)用resumeWith方法恢復(fù)生產(chǎn)過(guò)程——flow方法的執(zhí)行,同時(shí)掛起消費(fèi)過(guò)程的執(zhí)行。全部過(guò)程如下圖所示:

以上就是Kotlin協(xié)程之Flow基礎(chǔ)原理示例解析的詳細(xì)內(nèi)容,更多關(guān)于Kotlin協(xié)程Flow原理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論