欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kotlin協(xié)程flowOn與線程切換超詳細(xì)示例介紹

 更新時(shí)間:2022年09月05日 14:52:19   作者:LeeDuo.  
這篇文章主要介紹了Kotlin協(xié)程flowOn與線程切換,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

示例代碼

本文分析示例代碼如下:

launch(Dispatchers.Main) {
  flow {
    emit(1)
    emit(2)
  }.flowOn(Dispatchers.IO).collect {
    delay(1000)
    withContext(Dispatchers.IO) {
      Log.d("liduo", "$it")
    }
    Log.d("liduo", "$it")
  }
}

一.flowOn方法

flowOn方法用于將上游的流切換到指定協(xié)程上下文的調(diào)度器中執(zhí)行,同時(shí)不會把協(xié)程上下文暴露給下游的流,即flowOn方法中協(xié)程上下文的調(diào)度器不會對下游的流生效。如下面這段代碼所示:

launch(Dispatchers.Main) {
  flow {
    emit(2) // 執(zhí)行在IO線程池
  }.flowOn(Dispatchers.IO).map {
    it + 1 // 執(zhí)行在Default線程池
  }.flowOn(Dispatchers.Default).collect {
    Log.d("liduo", "$it") //執(zhí)行在主線程
  }
}

接下來,分析一下flowOn方法,代碼如下:

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
    // 檢查當(dāng)前協(xié)程沒有執(zhí)行結(jié)束
    checkFlowContext(context)
    return when {
        // 為空,則返回自身
        context == EmptyCoroutineContext -> this
        // 如果是可融合的Flow,則嘗試融合操作,獲取新的流
        this is FusibleFlow -> fuse(context = context)
        // 其他情況,包裝成可融合的Flow
        else -> ChannelFlowOperatorImpl(this, context = context)
    }
}
// 確保Job不為空
private fun checkFlowContext(context: CoroutineContext) {
    require(context[Job] == null) {
        "Flow context cannot contain job in it. Had $context"
    }
}

在flowOn方法中,首先會檢查方法所在的協(xié)程是否執(zhí)行結(jié)束。如果沒有結(jié)束,則會執(zhí)行判斷語句,這里flowOn方法傳入的上下文不是空上下文,且通過flow方法構(gòu)建出的Flow對象也不是FusibleFlow類型的對象,因此這里會走到else分支,將上游flow方法創(chuàng)建的Flow對象和上下文包裝成ChannelFlowOperatorImpl類型的對象。

1.ChannelFlowOperatorImpl類

ChannelFlowOperatorImpl類繼承自ChannelFlowOperator類,用于將上游的流包裝成一個(gè)ChannelFlow對象,它的繼承關(guān)系如下圖所示:

通過上圖可以知道,ChannelFlowOperatorImpl類最終繼承了ChannelFlow類,代碼如下:

internal class ChannelFlowOperatorImpl<T>(
    flow: Flow<T>,
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = Channel.OPTIONAL_CHANNEL,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {
    // 用于流融合時(shí)創(chuàng)建新的流
    override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
        ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)
    // 若當(dāng)前的流不需要通過Channel即可實(shí)現(xiàn)正常工作時(shí),會調(diào)用此方法
    override fun dropChannelOperators(): Flow<T>? = flow
    // 觸發(fā)對下一級流進(jìn)行收集
    override suspend fun flowCollect(collector: FlowCollector<T>) =
        flow.collect(collector)
}

二.collect方法

在Kotlin協(xié)程:Flow基礎(chǔ)原理中講到,當(dāng)執(zhí)行collect方法時(shí),內(nèi)部會調(diào)用最后產(chǎn)生的Flow對象的collect方法,代碼如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> {<!--{C}%3C!%2D%2D%20%2D%2D%3E--> override suspend fun emit(value: T) = action(value) })public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

這個(gè)最后產(chǎn)生的Flow對象就是ChannelFlowOperatorImpl類對象。

1.ChannelFlowOperator類的collect方法

ChannelFlowOperatorImpl類沒有重寫collect方法,因此調(diào)用的是它的父類ChannelFlowOperator類的collect方法,代碼如下:

override suspend fun collect(collector: FlowCollector<T>) {
    // OPTIONAL_CHANNEL為默認(rèn)值,這里滿足條件,之后會詳細(xì)講解
    if (capacity == Channel.OPTIONAL_CHANNEL) {
        // 獲取當(dāng)前協(xié)程的上下文
        val collectContext = coroutineContext
        // 計(jì)算新的上下文
        val newContext = collectContext + context
        // 如果前后上下文沒有發(fā)生變化
        if (newContext == collectContext)
            // 直接觸發(fā)對下一級流的收集
            return flowCollect(collector)
        // 如果上下文發(fā)生變化,但不需要切換線程
        if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])
            // 切換協(xié)程上下文,調(diào)用flowCollect方法觸發(fā)下一級流的收集
            return collectWithContextUndispatched(collector, newContext)
    }
    // 調(diào)用父類的collect方法
    super.collect(collector)
}
// 獲取當(dāng)前協(xié)程的上下文,該方法會被編譯器處理
@SinceKotlin("1.3")
@Suppress("WRONG_MODIFIER_TARGET")
@InlineOnly
public suspend inline val coroutineContext: CoroutineContext
    get() {
        throw NotImplementedError("Implemented as intrinsic")
    }

ChannelFlowOperator類的collect方法在設(shè)計(jì)上與協(xié)程的withContext方法設(shè)計(jì)思路是一致的:在方法內(nèi)根據(jù)上下文的不同情況進(jìn)行判斷,在必要時(shí)才會切換線程去執(zhí)行任務(wù)。

通過flowOn方法創(chuàng)建的ChannelFlowOperatorImpl類對象,參數(shù)capacity為默認(rèn)值OPTIONAL_CHANNEL。因此代碼在執(zhí)行時(shí)會進(jìn)入到判斷中,但因?yàn)槲覀冎付松舷挛臑镈ispatchers.IO,因此上下文發(fā)生了變化,同時(shí)攔截器也發(fā)生了變化,所以最后會調(diào)用ChannelFlowOperator類的父類的collect方法,也就是ChannelFlow類的collect方法。

2.ChannelFlow類的collect方法

ChannelFlow類的代碼如下:

override suspend fun collect(collector: FlowCollector<T>): Unit =
    coroutineScope {
        collector.emitAll(produceImpl(this))
    }

在ChannelFlow類的collect方法中,首先通過coroutineScope方法創(chuàng)建了一個(gè)作用域協(xié)程,接著調(diào)用了produceImpl方法,代碼如下:

public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
    scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)

produceImpl方法內(nèi)部調(diào)用了produce方法,并且傳入了待執(zhí)行的任務(wù)collectToFun。

produce方法在Kotlin協(xié)程:協(xié)程的基礎(chǔ)與使用中曾提到過,它是官方提供的啟動協(xié)程的四個(gè)方法之一,另外三個(gè)方法為launch方法、async方法、actor方法。代碼如下:

internal fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    onCompletion: CompletionHandler? = null,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
    // 根據(jù)容量與溢出策略創(chuàng)建Channel對象
    val channel = Channel<E>(capacity, onBufferOverflow)
    // 計(jì)算新的上下文
    val newContext = newCoroutineContext(context)
    // 創(chuàng)建協(xié)程
    val coroutine = ProducerCoroutine(newContext, channel)
    // 監(jiān)聽完成事件
    if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
    // 啟動協(xié)程
    coroutine.start(start, coroutine, block)
    return coroutine
}

在produce方法內(nèi)部,首先創(chuàng)建了一個(gè)Channel類型的對象,接著創(chuàng)建了類型為ProducerCoroutine的協(xié)程,并且傳入Channel對象作為參數(shù)。最后,produce方法返回了一個(gè)ReceiveChannel接口指向的對象,當(dāng)協(xié)程執(zhí)行完畢后,會通過Channel對象將結(jié)果通過send方法發(fā)送出來。

至此,可以知道flowOn方法的實(shí)現(xiàn)實(shí)際上是利用了協(xié)程攔截器的攔截功能。

在這里之后,代碼邏輯分成了兩部分,一部分是block在ProducerCoroutine協(xié)程中的執(zhí)行,另一部分是通過ReceiveChannel對象獲取執(zhí)行的結(jié)果。

3.flow方法中代碼的執(zhí)行

在produceImpl方法中,調(diào)用了produce方法,并且傳入了collectToFun對象,這個(gè)對象將會在produce方法創(chuàng)建的協(xié)程中執(zhí)行,代碼如下:

internal val collectToFun: suspend (ProducerScope<T>) -> Unit
    get() = { collectTo(it) }

當(dāng)調(diào)用collectToFun對象的invoke方法時(shí),會觸發(fā)collectTo方法的執(zhí)行,該方法在ChannelFlowOperator類中被重寫,代碼如下:

protected override suspend fun collectTo(scope: ProducerScope<T>) =
    flowCollect(SendingCollector(scope))

在collectTo方法中,首先將參數(shù)scope封裝成SendingCollector類型的對象,接著調(diào)用了flowCollect方法,該方法在ChannelFlowOperatorImpl類中被重寫,代碼如下:

override suspend fun flowCollect(collector: FlowCollector<T>) =
    flow.collect(collector)

ChannelFlowOperatorImpl類的flowCollect方法內(nèi)部調(diào)用了flow對象的collect方法,這個(gè)flow對象就是最初通過flow方法構(gòu)建的對象。根據(jù)Kotlin協(xié)程:Flow基礎(chǔ)原理的分析,這個(gè)flow對象類型為SafeFlow,最后會通過collectSafely方法,觸發(fā)flow方法中的block執(zhí)行。代碼如下:

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        // 觸發(fā)執(zhí)行
        collector.block()
    }
}

當(dāng)flow方法在執(zhí)行過程中需要向下游發(fā)出值時(shí),會調(diào)用emit方法。根據(jù)上面flowCollect方法和collectTo方法可以知道,collectSafely方法的collector對象就是collectTo方法中創(chuàng)建的SendingCollector類型的對象,代碼如下:

@InternalCoroutinesApi
public class SendingCollector<T>(
    private val channel: SendChannel<T>
) : FlowCollector<T> {
    // 通過Channel類對象發(fā)送值
    override suspend fun emit(value: T): Unit = channel.send(value)
}

當(dāng)調(diào)用SendingCollector類型的對象的emit方法時(shí),會通過調(diào)用類型為Channel的對象的send方法,將值發(fā)送出去。

接下來,將分析下游如何接收上游發(fā)出的值。

4.接收flow方法發(fā)出的值

回到ChannelFlow類的collect方法,之前提到collect方法中調(diào)用produceImpl方法,開啟了一個(gè)新的協(xié)程去執(zhí)行任務(wù),并且返回了一個(gè)ReceiveChannel接口指向的對象。代碼如下:

override suspend fun collect(collector: FlowCollector<T>): Unit =
    coroutineScope {
        collector.emitAll(produceImpl(this))
    }

在調(diào)用完produceImpl方法后,接著調(diào)用了emitAll方法,將ReceiveChannel接口指向的對象作為emitAll方法的參數(shù),代碼如下:

public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit =
    emitAllImpl(channel, consume = true)

emitAll方法是FlowCollector接口的擴(kuò)展方法,內(nèi)部調(diào)用了emitAllImpl方法對參數(shù)channel進(jìn)行封裝,代碼如下:

private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
    // 用于保存異常
    var cause: Throwable? = null
    try {
        // 死循環(huán)
        while (true) {
            // 掛起,等待接收Channel結(jié)果或Channel關(guān)閉
            val result = run { channel.receiveOrClosed() }
            // 如果Channel關(guān)閉了
            if (result.isClosed) {
                // 如果有異常,則拋出
                result.closeCause?.let { throw it }
                // 沒有異常,則跳出循環(huán)
                break
            }
            // 獲取并發(fā)送值
            emit(result.value)
        }
    } catch (e: Throwable) {
        // 捕獲到異常時(shí)拋出
        cause = e
        throw e
    } finally {
         // 執(zhí)行結(jié)束關(guān)閉Channel
        if (consume) channel.cancelConsumed(cause)
    }
}

emitAllImpl方法是FlowCollector接口的擴(kuò)展方法,而這里的FlowCollector接口指向的對象,就是collect方法中創(chuàng)建的匿名對象,代碼如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

在emitAllImpl方法中,當(dāng)通過receiveOrClosed方法獲取到上游發(fā)出的值時(shí),會調(diào)用emit方法通知下游,這時(shí)就會觸發(fā)collect方法中block的執(zhí)行,最終實(shí)現(xiàn)值從流的上游傳遞到了下游。

三.flowOn方法與流的融合

假設(shè)對一個(gè)流連續(xù)調(diào)用兩次flowOn方法,那么流最終會在哪個(gè)flowOn方法指定的調(diào)度器中執(zhí)行呢?代碼如下:

launch(Dispatchers.Main) {
    flow {
        emit(2)
      // emit方法是在IO線程執(zhí)行還是在主線程執(zhí)行呢?  
    }.flowOn(Dispatchers.IO).flowOn(Dispatchers.Main).collect {
        Log.d("liduo", "$it")
    }
}

答案是在IO線程執(zhí)行,為什么呢?

根據(jù)本篇上面的分析,當(dāng)?shù)谝淮握{(diào)用flowOn方法時(shí),上游的流會被包裹成ChannelFlowOperatorImpl對象,代碼如下:

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
    // 檢查當(dāng)前協(xié)程沒有執(zhí)行結(jié)束
    checkFlowContext(context)
    return when {
        // 為空,則返回自身
        context == EmptyCoroutineContext -> this
        // 如果是可融合的Flow,則嘗試融合操作,獲取新的流
        this is FusibleFlow -> fuse(context = context)
        // 其他情況,包裝成可融合的Flow
        else -> ChannelFlowOperatorImpl(this, context = context)
    }
}

而當(dāng)?shù)诙握{(diào)用flowOn方法時(shí),由于此時(shí)上游的流——ChannelFlowOperatorImpl類型的對象,實(shí)現(xiàn)了FusibleFlow接口,因此,這里會觸發(fā)流的融合,直接調(diào)用上游的流的fuse方法,并傳入新的上下文。這里容量和溢出策略均為默認(rèn)值。

根據(jù)Kotlin協(xié)程:Flow的融合、Channel容量、溢出策略的分析,這里會調(diào)用ChannelFlow類的fuse方法。相關(guān)代碼如下:

public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {
    ...
    // 計(jì)算融合后流的上下文
    // context為下游的上下文,this.context為上游的上下文 
    val newContext = context + this.context
    ...
}

再根據(jù)之前在Kotlin協(xié)程:協(xié)程上下文與上下文元素中的分析,當(dāng)兩個(gè)上下文進(jìn)行相加時(shí),后一個(gè)上下文中的攔截器會覆蓋前一個(gè)上下文中的攔截器。在上面的代碼中,后一個(gè)上下文為上游的流的上下文,因此會優(yōu)先使用上游的攔截器。代碼如下:

public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other

四.總結(jié)

粉線為使用時(shí)代碼編寫順序,綠線為下游觸發(fā)上游的調(diào)用順序,紅線為上游向下游發(fā)送值的調(diào)用順序,藍(lán)線為線程切換的位置。

到此這篇關(guān)于Kotlin協(xié)程flowOn與線程切換超詳細(xì)示例介紹的文章就介紹到這了,更多相關(guān)Kotlin flowOn與線程切換內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論