Kotlin協(xié)程flowOn與線程切換超詳細(xì)示例介紹
示例代碼
本文分析示例代碼如下:
launch(Dispatchers.Main) { flow { emit(1) emit(2) }.flowOn(Dispatchers.IO).collect { delay(1000) withContext(Dispatchers.IO) { Log.d("liduo", "$it") } Log.d("liduo", "$it") } }
一.flowOn方法
flowOn方法用于將上游的流切換到指定協(xié)程上下文的調(diào)度器中執(zhí)行,同時(shí)不會把協(xié)程上下文暴露給下游的流,即flowOn方法中協(xié)程上下文的調(diào)度器不會對下游的流生效。如下面這段代碼所示:
launch(Dispatchers.Main) { flow { emit(2) // 執(zhí)行在IO線程池 }.flowOn(Dispatchers.IO).map { it + 1 // 執(zhí)行在Default線程池 }.flowOn(Dispatchers.Default).collect { Log.d("liduo", "$it") //執(zhí)行在主線程 } }
接下來,分析一下flowOn方法,代碼如下:
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> { // 檢查當(dāng)前協(xié)程沒有執(zhí)行結(jié)束 checkFlowContext(context) return when { // 為空,則返回自身 context == EmptyCoroutineContext -> this // 如果是可融合的Flow,則嘗試融合操作,獲取新的流 this is FusibleFlow -> fuse(context = context) // 其他情況,包裝成可融合的Flow else -> ChannelFlowOperatorImpl(this, context = context) } } // 確保Job不為空 private fun checkFlowContext(context: CoroutineContext) { require(context[Job] == null) { "Flow context cannot contain job in it. Had $context" } }
在flowOn方法中,首先會檢查方法所在的協(xié)程是否執(zhí)行結(jié)束。如果沒有結(jié)束,則會執(zhí)行判斷語句,這里flowOn方法傳入的上下文不是空上下文,且通過flow方法構(gòu)建出的Flow對象也不是FusibleFlow類型的對象,因此這里會走到else分支,將上游flow方法創(chuàng)建的Flow對象和上下文包裝成ChannelFlowOperatorImpl類型的對象。
1.ChannelFlowOperatorImpl類
ChannelFlowOperatorImpl類繼承自ChannelFlowOperator類,用于將上游的流包裝成一個(gè)ChannelFlow對象,它的繼承關(guān)系如下圖所示:
通過上圖可以知道,ChannelFlowOperatorImpl類最終繼承了ChannelFlow類,代碼如下:
internal class ChannelFlowOperatorImpl<T>( flow: Flow<T>, context: CoroutineContext = EmptyCoroutineContext, capacity: Int = Channel.OPTIONAL_CHANNEL, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) { // 用于流融合時(shí)創(chuàng)建新的流 override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> = ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow) // 若當(dāng)前的流不需要通過Channel即可實(shí)現(xiàn)正常工作時(shí),會調(diào)用此方法 override fun dropChannelOperators(): Flow<T>? = flow // 觸發(fā)對下一級流進(jìn)行收集 override suspend fun flowCollect(collector: FlowCollector<T>) = flow.collect(collector) }
二.collect方法
在Kotlin協(xié)程:Flow基礎(chǔ)原理中講到,當(dāng)執(zhí)行collect方法時(shí),內(nèi)部會調(diào)用最后產(chǎn)生的Flow對象的collect方法,代碼如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> {<!--{C}%3C!%2D%2D%20%2D%2D%3E--> override suspend fun emit(value: T) = action(value) })public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> { override suspend fun emit(value: T) = action(value) })
這個(gè)最后產(chǎn)生的Flow對象就是ChannelFlowOperatorImpl類對象。
1.ChannelFlowOperator類的collect方法
ChannelFlowOperatorImpl類沒有重寫collect方法,因此調(diào)用的是它的父類ChannelFlowOperator類的collect方法,代碼如下:
override suspend fun collect(collector: FlowCollector<T>) { // OPTIONAL_CHANNEL為默認(rèn)值,這里滿足條件,之后會詳細(xì)講解 if (capacity == Channel.OPTIONAL_CHANNEL) { // 獲取當(dāng)前協(xié)程的上下文 val collectContext = coroutineContext // 計(jì)算新的上下文 val newContext = collectContext + context // 如果前后上下文沒有發(fā)生變化 if (newContext == collectContext) // 直接觸發(fā)對下一級流的收集 return flowCollect(collector) // 如果上下文發(fā)生變化,但不需要切換線程 if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor]) // 切換協(xié)程上下文,調(diào)用flowCollect方法觸發(fā)下一級流的收集 return collectWithContextUndispatched(collector, newContext) } // 調(diào)用父類的collect方法 super.collect(collector) } // 獲取當(dāng)前協(xié)程的上下文,該方法會被編譯器處理 @SinceKotlin("1.3") @Suppress("WRONG_MODIFIER_TARGET") @InlineOnly public suspend inline val coroutineContext: CoroutineContext get() { throw NotImplementedError("Implemented as intrinsic") }
ChannelFlowOperator類的collect方法在設(shè)計(jì)上與協(xié)程的withContext方法設(shè)計(jì)思路是一致的:在方法內(nèi)根據(jù)上下文的不同情況進(jìn)行判斷,在必要時(shí)才會切換線程去執(zhí)行任務(wù)。
通過flowOn方法創(chuàng)建的ChannelFlowOperatorImpl類對象,參數(shù)capacity為默認(rèn)值OPTIONAL_CHANNEL。因此代碼在執(zhí)行時(shí)會進(jìn)入到判斷中,但因?yàn)槲覀冎付松舷挛臑镈ispatchers.IO,因此上下文發(fā)生了變化,同時(shí)攔截器也發(fā)生了變化,所以最后會調(diào)用ChannelFlowOperator類的父類的collect方法,也就是ChannelFlow類的collect方法。
2.ChannelFlow類的collect方法
ChannelFlow類的代碼如下:
override suspend fun collect(collector: FlowCollector<T>): Unit = coroutineScope { collector.emitAll(produceImpl(this)) }
在ChannelFlow類的collect方法中,首先通過coroutineScope方法創(chuàng)建了一個(gè)作用域協(xié)程,接著調(diào)用了produceImpl方法,代碼如下:
public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> = scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
produceImpl方法內(nèi)部調(diào)用了produce方法,并且傳入了待執(zhí)行的任務(wù)collectToFun。
produce方法在Kotlin協(xié)程:協(xié)程的基礎(chǔ)與使用中曾提到過,它是官方提供的啟動協(xié)程的四個(gè)方法之一,另外三個(gè)方法為launch方法、async方法、actor方法。代碼如下:
internal fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, @BuilderInference block: suspend ProducerScope<E>.() -> Unit ): ReceiveChannel<E> { // 根據(jù)容量與溢出策略創(chuàng)建Channel對象 val channel = Channel<E>(capacity, onBufferOverflow) // 計(jì)算新的上下文 val newContext = newCoroutineContext(context) // 創(chuàng)建協(xié)程 val coroutine = ProducerCoroutine(newContext, channel) // 監(jiān)聽完成事件 if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion) // 啟動協(xié)程 coroutine.start(start, coroutine, block) return coroutine }
在produce方法內(nèi)部,首先創(chuàng)建了一個(gè)Channel類型的對象,接著創(chuàng)建了類型為ProducerCoroutine的協(xié)程,并且傳入Channel對象作為參數(shù)。最后,produce方法返回了一個(gè)ReceiveChannel接口指向的對象,當(dāng)協(xié)程執(zhí)行完畢后,會通過Channel對象將結(jié)果通過send方法發(fā)送出來。
至此,可以知道flowOn方法的實(shí)現(xiàn)實(shí)際上是利用了協(xié)程攔截器的攔截功能。
在這里之后,代碼邏輯分成了兩部分,一部分是block在ProducerCoroutine協(xié)程中的執(zhí)行,另一部分是通過ReceiveChannel對象獲取執(zhí)行的結(jié)果。
3.flow方法中代碼的執(zhí)行
在produceImpl方法中,調(diào)用了produce方法,并且傳入了collectToFun對象,這個(gè)對象將會在produce方法創(chuàng)建的協(xié)程中執(zhí)行,代碼如下:
internal val collectToFun: suspend (ProducerScope<T>) -> Unit get() = { collectTo(it) }
當(dāng)調(diào)用collectToFun對象的invoke方法時(shí),會觸發(fā)collectTo方法的執(zhí)行,該方法在ChannelFlowOperator類中被重寫,代碼如下:
protected override suspend fun collectTo(scope: ProducerScope<T>) = flowCollect(SendingCollector(scope))
在collectTo方法中,首先將參數(shù)scope封裝成SendingCollector類型的對象,接著調(diào)用了flowCollect方法,該方法在ChannelFlowOperatorImpl類中被重寫,代碼如下:
override suspend fun flowCollect(collector: FlowCollector<T>) = flow.collect(collector)
ChannelFlowOperatorImpl類的flowCollect方法內(nèi)部調(diào)用了flow對象的collect方法,這個(gè)flow對象就是最初通過flow方法構(gòu)建的對象。根據(jù)Kotlin協(xié)程:Flow基礎(chǔ)原理的分析,這個(gè)flow對象類型為SafeFlow,最后會通過collectSafely方法,觸發(fā)flow方法中的block執(zhí)行。代碼如下:
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() { override suspend fun collectSafely(collector: FlowCollector<T>) { // 觸發(fā)執(zhí)行 collector.block() } }
當(dāng)flow方法在執(zhí)行過程中需要向下游發(fā)出值時(shí),會調(diào)用emit方法。根據(jù)上面flowCollect方法和collectTo方法可以知道,collectSafely方法的collector對象就是collectTo方法中創(chuàng)建的SendingCollector類型的對象,代碼如下:
@InternalCoroutinesApi public class SendingCollector<T>( private val channel: SendChannel<T> ) : FlowCollector<T> { // 通過Channel類對象發(fā)送值 override suspend fun emit(value: T): Unit = channel.send(value) }
當(dāng)調(diào)用SendingCollector類型的對象的emit方法時(shí),會通過調(diào)用類型為Channel的對象的send方法,將值發(fā)送出去。
接下來,將分析下游如何接收上游發(fā)出的值。
4.接收flow方法發(fā)出的值
回到ChannelFlow類的collect方法,之前提到collect方法中調(diào)用produceImpl方法,開啟了一個(gè)新的協(xié)程去執(zhí)行任務(wù),并且返回了一個(gè)ReceiveChannel接口指向的對象。代碼如下:
override suspend fun collect(collector: FlowCollector<T>): Unit = coroutineScope { collector.emitAll(produceImpl(this)) }
在調(diào)用完produceImpl方法后,接著調(diào)用了emitAll方法,將ReceiveChannel接口指向的對象作為emitAll方法的參數(shù),代碼如下:
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit = emitAllImpl(channel, consume = true)
emitAll方法是FlowCollector接口的擴(kuò)展方法,內(nèi)部調(diào)用了emitAllImpl方法對參數(shù)channel進(jìn)行封裝,代碼如下:
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) { // 用于保存異常 var cause: Throwable? = null try { // 死循環(huán) while (true) { // 掛起,等待接收Channel結(jié)果或Channel關(guān)閉 val result = run { channel.receiveOrClosed() } // 如果Channel關(guān)閉了 if (result.isClosed) { // 如果有異常,則拋出 result.closeCause?.let { throw it } // 沒有異常,則跳出循環(huán) break } // 獲取并發(fā)送值 emit(result.value) } } catch (e: Throwable) { // 捕獲到異常時(shí)拋出 cause = e throw e } finally { // 執(zhí)行結(jié)束關(guān)閉Channel if (consume) channel.cancelConsumed(cause) } }
emitAllImpl方法是FlowCollector接口的擴(kuò)展方法,而這里的FlowCollector接口指向的對象,就是collect方法中創(chuàng)建的匿名對象,代碼如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> { override suspend fun emit(value: T) = action(value) })
在emitAllImpl方法中,當(dāng)通過receiveOrClosed方法獲取到上游發(fā)出的值時(shí),會調(diào)用emit方法通知下游,這時(shí)就會觸發(fā)collect方法中block的執(zhí)行,最終實(shí)現(xiàn)值從流的上游傳遞到了下游。
三.flowOn方法與流的融合
假設(shè)對一個(gè)流連續(xù)調(diào)用兩次flowOn方法,那么流最終會在哪個(gè)flowOn方法指定的調(diào)度器中執(zhí)行呢?代碼如下:
launch(Dispatchers.Main) { flow { emit(2) // emit方法是在IO線程執(zhí)行還是在主線程執(zhí)行呢? }.flowOn(Dispatchers.IO).flowOn(Dispatchers.Main).collect { Log.d("liduo", "$it") } }
答案是在IO線程執(zhí)行,為什么呢?
根據(jù)本篇上面的分析,當(dāng)?shù)谝淮握{(diào)用flowOn方法時(shí),上游的流會被包裹成ChannelFlowOperatorImpl對象,代碼如下:
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> { // 檢查當(dāng)前協(xié)程沒有執(zhí)行結(jié)束 checkFlowContext(context) return when { // 為空,則返回自身 context == EmptyCoroutineContext -> this // 如果是可融合的Flow,則嘗試融合操作,獲取新的流 this is FusibleFlow -> fuse(context = context) // 其他情況,包裝成可融合的Flow else -> ChannelFlowOperatorImpl(this, context = context) } }
而當(dāng)?shù)诙握{(diào)用flowOn方法時(shí),由于此時(shí)上游的流——ChannelFlowOperatorImpl類型的對象,實(shí)現(xiàn)了FusibleFlow接口,因此,這里會觸發(fā)流的融合,直接調(diào)用上游的流的fuse方法,并傳入新的上下文。這里容量和溢出策略均為默認(rèn)值。
根據(jù)Kotlin協(xié)程:Flow的融合、Channel容量、溢出策略的分析,這里會調(diào)用ChannelFlow類的fuse方法。相關(guān)代碼如下:
public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> { ... // 計(jì)算融合后流的上下文 // context為下游的上下文,this.context為上游的上下文 val newContext = context + this.context ... }
再根據(jù)之前在Kotlin協(xié)程:協(xié)程上下文與上下文元素中的分析,當(dāng)兩個(gè)上下文進(jìn)行相加時(shí),后一個(gè)上下文中的攔截器會覆蓋前一個(gè)上下文中的攔截器。在上面的代碼中,后一個(gè)上下文為上游的流的上下文,因此會優(yōu)先使用上游的攔截器。代碼如下:
public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
四.總結(jié)
粉線為使用時(shí)代碼編寫順序,綠線為下游觸發(fā)上游的調(diào)用順序,紅線為上游向下游發(fā)送值的調(diào)用順序,藍(lán)線為線程切換的位置。
到此這篇關(guān)于Kotlin協(xié)程flowOn與線程切換超詳細(xì)示例介紹的文章就介紹到這了,更多相關(guān)Kotlin flowOn與線程切換內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
讓Android中RadioGroup不顯示在輸入法上面的辦法
在Android開發(fā)中,發(fā)現(xiàn)一個(gè)問題,打開輸入法導(dǎo)致下面的radioGroup的位置發(fā)生了變化,被頂?shù)搅溯斎敕ǖ纳厦?,那么該如何解決呢?下面來看看。2016-08-08Android 動畫之ScaleAnimation應(yīng)用詳解
本節(jié)講解ScaleAnimation 動畫在應(yīng)用中的實(shí)現(xiàn),有需要的朋友可以參考下2012-12-12Android熱更新開源項(xiàng)目Tinker集成實(shí)踐總結(jié)
最近項(xiàng)目集成了Tinker,開始認(rèn)為集成會比較簡單,但是在實(shí)際操作的過程中還是遇到了一些問題,本文就會介紹在集成過程大家基本會遇到的主要問題。下面跟著小編一起來看下吧2017-01-01Android WebView與JS交互全面詳解(小結(jié))
本篇文章主要介紹了Android WebView與JS交互全面詳解(小結(jié)),實(shí)現(xiàn)了Android客戶端與Web網(wǎng)頁交互,具有一定的參考價(jià)值,有興趣的可以了解一下2017-11-11Android使用Intent.ACTION_SEND分享圖片和文字內(nèi)容的示例代碼
這篇文章主要介紹了Android使用Intent.ACTION_SEND分享圖片和文字內(nèi)容的示例代碼的實(shí)例代碼,具有很好的參考價(jià)值,希望對大家有所幫助,一起跟隨小編過來看看吧2018-05-05Android ListView 實(shí)例講解清晰易懂
這篇文章主要通過實(shí)例介紹了Android ListView,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-09-09Android Bluetooth藍(lán)牙技術(shù)使用流程詳解
這篇文章主要介紹了Android Bluetooth藍(lán)牙技術(shù)使用流程詳解的相關(guān)資料,需要的朋友可以參考下2016-02-02