欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kotlin協(xié)程的線程調(diào)度示例詳解

 更新時(shí)間:2022年12月12日 08:58:50   作者:rencai  
這篇文章主要為大家介紹了Kotlin協(xié)程的線程調(diào)度示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

引言

在第一篇文章中我們分析了協(xié)程啟動(dòng)創(chuàng)建過程啟動(dòng)過程,在本文中,我們將著重剖析協(xié)程中協(xié)程調(diào)度的邏輯流程。主要是分析解答如下2個(gè)問題:

  • 涉及到協(xié)程方法器是如何將協(xié)程代碼調(diào)度到特定的線程執(zhí)行?
  • 子協(xié)程執(zhí)行完又是如何切換0回父協(xié)程的線程環(huán)境?

一、協(xié)程的分發(fā)器作用

1.1 測試代碼

GlobalScope.launch {
    //協(xié)程體1
    Log.d(TAG, "before suspend job.")
    withContext(Dispatchers.Main) {
        //協(xié)程體2
        Log.d(TAG, "print in Main thread.")
    }
    Log.d(TAG, "after suspend job.")
}
  • 此次的協(xié)程測試用例中,我們默認(rèn)的launch一個(gè)協(xié)程,我們簡單的將launch需要執(zhí)行的這外層邏輯為協(xié)程體1。
  • 在協(xié)程體1中,我們使用withContext將協(xié)程切換到主線程執(zhí)行,打印日志。我們將這里面執(zhí)行的協(xié)程邏輯為協(xié)程體2
  • 協(xié)程體2執(zhí)行完成后,切回協(xié)程體1中執(zhí)行并打印Log。
  • 注意,根據(jù)我們之前《協(xié)程的創(chuàng)建與啟動(dòng)》文章中分析的,Kotlin編譯器針對(duì)協(xié)程體1和協(xié)程體2分別生成一個(gè)繼承與SuspenLamabda的類型,比如:class MainActivity#onCreate$1 : SuspenLambda{...}。我們在講協(xié)程體時(shí),也同時(shí)代指這個(gè)類實(shí)例。

繼續(xù)跟蹤launch()函數(shù)執(zhí)行邏輯,這次跟蹤過程不同與《協(xié)程的創(chuàng)建與啟動(dòng)》篇章,我們會(huì)將側(cè)重點(diǎn)放在啟動(dòng)過程中協(xié)程調(diào)度器是如何起作用的?接下來見1.2

1.2 CoroutineScope.launch

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    //1. 見1.2.1
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    //2. 詳見1.3
    coroutine.start(start, coroutine, block)
    return coroutine
}
  • 這里會(huì)新建一個(gè)CoroutineContext,詳見1.2.1
  • 根據(jù)之前的分析,這個(gè)里最終會(huì)調(diào)用到startCoroutineCancellable()方法,詳見1.3流程。

1.2.1 newCoroutineContext

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val combined = foldCopies(coroutineContext, context, true)
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return 
    if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default
    else 
    	debug
}

coroutineContextcoroutineContextCoroutineScope的成員變量,當(dāng)此時(shí)為GlobalScope.coroutineContext==EmptyCoroutineContext

context:由于調(diào)用launch時(shí)沒有指定Context,所以傳到此處也是EmptyCoroutineContext。foldCopies()函數(shù)將2個(gè)context相加并拷貝,最終combied==EmptyCoroutineContext

而在return這最后判斷返回的是debug+Dispatchers.Defatult,所以此時(shí)默認(rèn)的分發(fā)器為Dispatchers.Defatult。

這里涉及到的協(xié)程Context運(yùn)算不做深入剖析,簡單可以認(rèn)為協(xié)程重寫了“+”運(yùn)算,使得Context之間可以使用“+”來疊加,沒有的Element類型會(huì)被添加到Element集合,集合中已有的Element類型會(huì)被覆蓋。

1.3 startCoroutineCancellable

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
    	//1. 創(chuàng)建SuspendLambda協(xié)程體
        createCoroutineUnintercepted(receiver, completion)
            //2. 攔截:取出分發(fā)器,并構(gòu)建方法器Continuation。詳見1.3.1
            .intercepted()
            //3. 調(diào)用方法器Continuation的resume方法,詳見1.4
            .resumeCancellableWith(Result.success(Unit), onCancellation)
    }
  • 這里的構(gòu)建協(xié)程體在《協(xié)程的創(chuàng)建與啟動(dòng)》一節(jié)中已經(jīng)剖析,不再贅述。
  • 進(jìn)行攔截,注意:這里其實(shí)會(huì)根據(jù)方法器再構(gòu)建出一個(gè)DispatchedContinuation對(duì)象,它也是一個(gè)續(xù)體類型,這是對(duì)協(xié)程體的一次包裝。詳見1.3.1小節(jié)。
  • 調(diào)用攔截器續(xù)體的resumeCancellableWith()開始狀態(tài)機(jī)流轉(zhuǎn),執(zhí)行分發(fā)流程詳見1.4小節(jié)。

1.3.1 intercepted()

 public fun intercepted(): Continuation<Any?> =
        intercepted?: (
                //1. 取出攔截器
                context[ContinuationInterceptor]?
                    //2.構(gòu)建攔截器續(xù)體
                    .interceptContinuation(this)?: this)
                .also { intercepted = it }
  • 取出當(dāng)前上下文中的攔截器類型,根據(jù)之前1.2.1小節(jié)的分析,這里取出來的是Dispatchers.Defatult。
  • interceptContinuation(this)為構(gòu)建攔截器續(xù)體,注意這里傳入的this協(xié)程體1。 詳見1.3.2。

1.3.2 CoroutineDispatcher

//Base class to be extended by all coroutine dispatcher implementations.
public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
public final override fun <T> interceptContinuation(continuation: Continuation<T>):
        //詳見1.4
        Continuation<T> = DispatchedContinuation(this, continuation)
}

直接新建了一個(gè)DispatchedContinuation對(duì)象實(shí)例這里需要注意傳入的構(gòu)建參數(shù):

  • this:當(dāng)前Dispatcher,也就是Dispatchers.Defatult
  • continuation:協(xié)程體1。

1.3.3 小結(jié)

自此Continuation.intercepted()方法就分析結(jié)束,最終的結(jié)果是:用上下文中的Dispatcher和當(dāng)前Contination對(duì)象也就是協(xié)程體1,共同作為構(gòu)建參數(shù),新建了一個(gè)DispatchedContinuation對(duì)象。

接下來接著1.3中的第三點(diǎn),調(diào)用DispatchedContinuation.resumeCancellableWith()方法開始分析。

1.4 DispatchedContinuation

internal class DispatchedContinuation<in T>(
    //1. 分發(fā)器
    @JvmField val dispatcher: CoroutineDispatcher,
	//2. 注意這里將Continuation的實(shí)現(xiàn)委托給了continuation成員變量。
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED)
, CoroutineStackFrame,
Continuation<T> by continuation {
    	//3. 復(fù)寫屬性delegate為自己
	    override val delegate: Continuation<T>
        get() = this
    ...
    // We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
    // It is used only in Continuation<T>.resumeCancellableWith
    @Suppress("NOTHING_TO_INLINE")
    inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
        //默認(rèn)為true
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            //4. 詳細(xì)見
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
                    resumeUndispatchedWith(result)
                }
            }
        }
    }
}

這里的dispatcher==Dispatchers.Defatult,所以接下來需要解析Dispatchers.Defatult到底是什么東西。詳見1.5

  • 成員變量dispatcher==Dispatchers.Default。
  • 成員變量continucation==協(xié)程體1(SuspenLambda類型實(shí)例)。同時(shí)DispatchedContinuation繼承于Continuation接口,它將Continuation接口的實(shí)現(xiàn)委托給了成員變量continuation
  • deleagte為復(fù)寫了DispatchedTask.delegate屬性,將其返回自己。
  • 調(diào)用分發(fā)器也就是Dispatchers.Defatultdispatch()方法,注意這里傳入的參數(shù):

context:來自Continuation接口的屬性,由于委托給了成員變量continuation,所以此context==continuation.context。

this:分發(fā)器本身Dispatchers.Defatult

自此這個(gè)方法的分析結(jié)束:調(diào)用分發(fā)器的進(jìn)行分發(fā),接下來分析就開始分析協(xié)程方法器CoroutineDispatcher

1.5 DefaultScheduler

//Dispathcer.kt
@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler
//Dispathcer.kt
// Instance of Dispatchers.Default
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
    CORE_POOL_SIZE, MAX_POOL_SIZE,
    IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
    ...
}

實(shí)際上是繼承 SchedulerCoroutineDispatcher類型。詳見1.5.1

1.5.1 SchedulerCoroutineDispatcher

internal open class SchedulerCoroutineDispatcher(
    private val corePoolSize: Int = CORE_POOL_SIZE,
    private val maxPoolSize: Int = MAX_POOL_SIZE,
    private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
    override val executor: Executor
        get() = coroutineScheduler
    // This is variable for test purposes, so that we can reinitialize from clean state
    private var coroutineScheduler = createScheduler()
    private fun createScheduler() =
        //1. 詳見1.5.2
        CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    //2. 詳見1.5.2
    override fun dispatch(context: CoroutineContext, block: Runnable): Unit 
    = coroutineScheduler.dispatch(block)
    ...
}
//Executors.kt
//2. 實(shí)際上是繼承ExecutorCoroutineDispatcher
public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {
    ...
}
  • 可以看到實(shí)際上調(diào)用了CoroutineScheduler.dispatch方法。此時(shí)發(fā)現(xiàn),第二個(gè)參數(shù)是Runnable類型的,而在1.4小節(jié)中,我們知道傳入的是this也就是DispatchedContinuation,所以DispatchedContinuation繼承的父類中,必定有繼承了Runnable接口,而他的run方法的實(shí)現(xiàn)也在父類中,這塊我們暫時(shí)按下不表,接著看繼續(xù)跟蹤coroutineScheduler.dispatch(block)

1.5.2 CoroutineScheduler

internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int,
    @JvmField val maxPoolSize: Int,
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
	... 
    override fun execute(command: Runnable) = dispatch(command)
    fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
        trackTask() // this is needed for virtual time support
        val task = createTask(block, taskContext)
        // try to submit the task to the local queue and act depending on the result
        val currentWorker = currentWorker()
        val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
        if (notAdded != null) {
            if (!addToGlobalQueue(notAdded)) {
                // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
                throw RejectedExecutionException("$schedulerName was terminated")
            }
        }
        val skipUnpark = tailDispatch && currentWorker != null
        // Checking 'task' instead of 'notAdded' is completely okay
        if (task.mode == TASK_NON_BLOCKING) {
            if (skipUnpark) return
            signalCpuWork()
        } else {
            // Increment blocking tasks anyway
            signalBlockingWork(skipUnpark = skipUnpark)
        }
    }
}
  • 該類繼承了Executor類,而且它的構(gòu)建參數(shù)可看到是線程池的參數(shù),所以可以知道這個(gè)其實(shí)是Kotlin協(xié)程實(shí)現(xiàn)的一個(gè)線程池,具體就不跟進(jìn)去了。
  • execute()過程也是dispatch過程:將任務(wù)投遞到任務(wù)隊(duì)列,然后通知線程去取任務(wù)執(zhí)行,自此完成了線程切換動(dòng)作。
  • 而在新線程里執(zhí)行的Runnable為1.4中的調(diào)用代碼:dispatcher.dispatch(context, this)中的this,也就是DispatchedContinuationDispatchedContinuation.kt并沒有實(shí)現(xiàn)run方法,那么一定是他繼承的父類實(shí)現(xiàn)了Runnable接口并實(shí)現(xiàn),所以需要接著看它繼承的父類:DispatchedTask類。

1.6 DispatchedTask.run()

internal abstract class DispatchedTask<in T>(
    @JvmField public var resumeMode: Int
) : SchedulerTask() {
	...
    internal abstract val delegate: Continuation<T>
    @Suppress("UNCHECKED_CAST")
    internal open fun <T> getSuccessfulResult(state: Any?): T =
        state as T
    internal open fun getExceptionalResult(state: Any?): Throwable? =
        (state as? CompletedExceptionally)?.cause
    public final override fun run() {
        assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
        val taskContext = this.taskContext
        var fatalException: Throwable? = null
        try {
            val delegate = delegate as DispatchedContinuation<T>
            //1. 取出代理商的續(xù)體
            val continuation = delegate.continuation
            withContinuationContext(continuation, delegate.countOrElement) {
                val context = continuation.context
                val state = takeState() // NOTE: Must take state in any case, even if cancelled
                val exception = getExceptionalResult(state)
                val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
                if (job != null && !job.isActive) {
                    val cause = job.getCancellationException()
                    cancelCompletedResult(state, cause)
                    continuation.resumeWithStackTrace(cause)
                } else {
                    if (exception != null) {
                        continuation.resumeWithException(exception)
                    } else {
                        //1. 被包裝的續(xù)體的resume方法,真正的開始出發(fā)其協(xié)程狀態(tài)機(jī)代碼。
                        continuation.resume(getSuccessfulResult(state))
                    }
                }
            }
        } catch (e: Throwable) {
            // This instead of runCatching to have nicer stacktrace and debug experience
            fatalException = e
        } finally {
            val result = runCatching { taskContext.afterTask() }
            handleFatalException(fatalException, result.exceptionOrNull())
        }
    }
}
  • delegate轉(zhuǎn)為DispatchedContinuation,應(yīng)該注意1.4 小節(jié)中DispatchedContinuation繼承DispatchTask時(shí),便對(duì)此delegate進(jìn)行了復(fù)寫:

override val delegate: Continuation

get() = this

而此delegate.continucation便是當(dāng)初newDispatchedContinuation(this)時(shí)傳入的this,此this就是Kotlin編譯器一開始為協(xié)程體生成的SuspendLambda類型對(duì)象。具體可以回看1.3小節(jié)。

  • 調(diào)用了continuation.resume()方法觸發(fā)了協(xié)程的狀態(tài)機(jī)進(jìn)而開始執(zhí)行協(xié)程業(yè)務(wù)邏輯代碼,結(jié)合之前1.5.2的分析可以知道,這個(gè)方法的調(diào)用已經(jīng)是被dispatch到特定線程,完成線程切換后執(zhí)行的。所以協(xié)程狀態(tài)機(jī)的代碼也是跑在新線程上的。

1.7 總結(jié)

至此,協(xié)程的線程調(diào)度分析結(jié)束,關(guān)鍵有如下幾個(gè)要點(diǎn):

  • 創(chuàng)建SuspendLambda時(shí),他的協(xié)程上下文對(duì)象來自于comletion.context,默認(rèn)就是Dispatcher.Default。
  • SuspendLambda啟動(dòng)時(shí)調(diào)用了intercept()進(jìn)行一層包裝,得到DispatchedContinuation,后續(xù)協(xié)程啟動(dòng)是啟動(dòng)的DispatchedContinuation協(xié)程。
  • DispatchedContinuation繼承于Runnable接口,協(xié)程啟動(dòng)時(shí)將自己投遞到分發(fā)器dispatcher執(zhí)行run方法,從而達(dá)到了線程切換效果。
  • DispatchedContinuationrun方法中,調(diào)用SuspendLambda.resume()啟動(dòng)狀態(tài)機(jī)。在新線程執(zhí)行協(xié)程狀態(tài)機(jī)代碼。

這一小節(jié)中,介紹了如何將協(xié)程調(diào)度到目的線程執(zhí)行,接下來分析如何做到隨意切換線程后,然后再恢復(fù)到原來線程的。

二、協(xié)程中的線程切換

在第一小節(jié)中,我們搞清楚了協(xié)程啟動(dòng)時(shí),協(xié)程調(diào)度器是如何在其中起作用的。這一小節(jié)旨在剖析在協(xié)程用分發(fā)器切換線程執(zhí)行新的掛起函數(shù)后,是如何切換會(huì)原來線程繼續(xù)執(zhí)行剩下的邏輯的。

為此,我們需要將1.1的測試代碼反編譯出來實(shí)際代碼進(jìn)而分析。

2.1 反編譯代碼

2.1.1 MainActivityonCreateonCreateonCreate1

final class MainActivity$onCreate$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
    ...
    @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
    public final Object invokeSuspend(Object $result) {
        Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (this.label) {
            case 0:
                ResultKt.throwOnFailure($result);
                Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4147xf96cab04());
                this.label = 1;
                //1. 新建編譯器自動(dòng)生成的繼承于SuspendLambda的類型。
                AnonymousClass1 anonymousClass1 = new AnonymousClass1(null);
                //2. 調(diào)用withContext
            	Object res = BuildersKt.withContext(Dispatchers.getIO(), anonymousClass1, this);
                if (res != coroutine_suspended) {
                    break;
                } else {
                    //掛起
                    return coroutine_suspended;
                }
            case 1:
                ResultKt.throwOnFailure($result);
                break;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4148xe0c1b328());
        return Unit.INSTANCE;
    }
}

根據(jù)之前的文章分析,這里suspend lambda 的類型都自動(dòng)生成繼承于SuspendLambda的類型。詳見2.1.2。

anonymousClass1傳入withContext,而且注意這里傳入了this==MainActivity$onCreate$1,詳見2.2。

2.1.2 AnonymousClass1

/* compiled from: MainActivity.kt */
public static final class AnonymousClass1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Integer>, Object> {
    int label
    ...
    @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
    public final Object invokeSuspend(Object obj) {
        IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (this.label) {
            case 0:
                ResultKt.throwOnFailure(obj);
                return Boxing.boxInt(Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4146x7c0f011f()));
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
    }
}

2.2 withContext

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    //1. 獲取當(dāng)前協(xié)程, 注意這里的uCont就是當(dāng)前續(xù)體,也就是MainActivity$onCreate$1
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        //2. 計(jì)算獲的新的協(xié)程上下文
        val oldContext = uCont.context
        val newContext = oldContext + context
        //3. 快速判斷:新上下文和舊上下文一致的情況快速處理。
        // always check for cancellation of new context
        newContext.ensureActive()
        // FAST PATH #1 -- new context is the same as the old one
        if (newContext === oldContext) {
            val coroutine = ScopeCoroutine(newContext, uCont)
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
        }
        // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
        // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
            val coroutine = UndispatchedCoroutine(newContext, uCont)
            // There are changes in the context, so this thread needs to be updated
            withCoroutineContext(newContext, null) {
                return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
            }
        }
        // SLOW PATH -- use new dispatcher
        //4. 新建一個(gè)DispatchedCoroutine
        val coroutine = DispatchedCoroutine(newContext, uCont)
        //5. 啟動(dòng)協(xié)程
        block.startCoroutineCancellable(coroutine, coroutine)
        coroutine.getResult()
    }
}
  • suspendCoroutineUninterceptedOrReturn這個(gè)函數(shù)直接步進(jìn)是看不到實(shí)現(xiàn)的,它的實(shí)現(xiàn)是由Kotlin編譯器生成的,它的作用是用來獲取當(dāng)前續(xù)體的,并且通過uCont返回,這里就是MainActivity$onCreate$1
  • 將舊協(xié)程上下文和新的上下文一起。計(jì)算得到最終的上下文。這里的context==Dispatchers.getIO()。
  • 快速判斷,不用看。
  • 新建一個(gè)DispatchedCoroutine,注意這里傳入了新的協(xié)程上下文和當(dāng)前續(xù)體對(duì)象。
  • 調(diào)用startCoroutineCancellable()啟動(dòng)協(xié)程。這里的同1.3.2小節(jié)分析一樣,詳見 2.2.1

2.2.1 startCoroutineCancellable

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
    	//1. 創(chuàng)建SuspendLambda協(xié)程體
        createCoroutineUnintercepted(receiver, completion)
            //2. 攔截:取出分發(fā)器,并構(gòu)建方法器Continuation。詳見1.3.1
            .intercepted()
            //3. 調(diào)用方法器Continuation的resume方法,詳見1.4
            .resumeCancellableWith(Result.success(Unit), onCancellation)
    }

此方法在之前1.3小節(jié)已經(jīng)分析過,針對(duì)此此次調(diào)用,其中的改變是協(xié)程上下文中的分發(fā)器已經(jīng)被設(shè)置為Dispatchers.Main

  • 創(chuàng)建了SuspendLambda對(duì)象,此對(duì)象的CoroutineContextcompletion.context。而其中的ContinuationInterceptor類型Element就是我們之前傳入的Dispatchers.Main。
  • 創(chuàng)建一個(gè)DispatchedContinuation。
  • 將協(xié)程SuspendLambda的狀態(tài)機(jī)邏輯通過Dispatcher.Main調(diào)度到主線程執(zhí)行,調(diào)度過程參考第一下節(jié)。分發(fā)邏輯詳見2.7小節(jié)。
  • 當(dāng)SuspendLambda的狀態(tài)機(jī)invokeSuspend()邏輯執(zhí)行完成后,會(huì)返回到BaseContinuationImpl.resumeWith(),我們需要接此方法分析,來得到協(xié)程在切換到主線程執(zhí)行后,又是怎么切回協(xié)程體1的執(zhí)行線程的,詳見2.3。

2.3 resumeWith

public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    //1. 進(jìn)入此判斷
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

當(dāng)狀態(tài)機(jī)執(zhí)行完后, 后進(jìn)入到completion的類型判斷,由2.2和2.2.1可以知道,當(dāng)初傳入的completion是DispatchedCoroutine類型,所以加入到else分支,調(diào)用了DispatchedCoroutine.resumeWith(),接下來分析此方法。

在此之前,我們需要看下DispatchedCoroutine的繼承關(guān)系,詳見2.4.1。如果想直接跟蹤流程,可以直接看2.4.2。

2.4 DispatchedCoroutine

2.4.1 DispatchedCoroutine 的繼承關(guān)系

internal class DispatchedCoroutine<in T>(
    context: CoroutineContext,
    uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
}

繼承于ScopeCoroutine

internal open class ScopeCoroutine<in T>(
    context: CoroutineContext,
    @JvmField val uCont: Continuation<T> // unintercepted continuation
) : AbstractCoroutine<T>(context, true, true), CoroutineStackFrame {
}

繼承于AbstractCoroutine

public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
}

2.5 協(xié)程線程的恢復(fù)

2.5.1 AbstractCoroutine.resumeWith()

    public final override fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }

調(diào)用了afterResume方法,此方法在DispatchedCoroutine類型有具體實(shí)現(xiàn)。見2.5.2

2.5.2 afterResume

//DispatchedCoroutine
override fun afterResume(state: Any?) {
        if (tryResume()) return // completed before getResult invocation -- bail out
        // Resume in a cancellable way because we have to switch back to the original dispatcher
        uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
  • 取出當(dāng)前續(xù)體uCont,這個(gè)續(xù)體根據(jù)之前的分析:2.2小節(jié),可以知道它等于MainActivity$onCreate$1。
  • intercepted():取出其分發(fā)攔截器
  • resumeCancellableWith:使用方法攔截器協(xié)程體,將uCont續(xù)體的狀態(tài)機(jī)邏輯調(diào)度到相對(duì)應(yīng)的線程環(huán)境執(zhí)行,這里就是之前的Dispatcher.Default。注意其注釋:“將其切換到原先的分發(fā)器”。2?而這一過程其實(shí)和1.3小節(jié)的過程一致。
  • 恢復(fù)到Dispatcher.Default繼續(xù)執(zhí)行狀態(tài)機(jī)時(shí),由于label已經(jīng)被更新,所以會(huì)往下繼續(xù)執(zhí)行,打印最后一句log。

2.6 總結(jié)

withContext(Dispatcher.Main)啟動(dòng)的協(xié)程時(shí),取得當(dāng)前協(xié)程續(xù)體uCount也就是MainActivity$onCreate$1,會(huì)計(jì)算出新的協(xié)程context,然后用它們創(chuàng)建一個(gè)DispatchedCoroutine

AnonymousClass1協(xié)程啟動(dòng)時(shí),用DispatchedCoroutine作為completion參數(shù),然后啟動(dòng),此時(shí)會(huì)調(diào)度主線程執(zhí)行協(xié)程。

當(dāng)協(xié)程執(zhí)行完成后,AnonymousClass1.resumeWith()方法會(huì)調(diào)用completion.resumeWith()

DispatchedCoroutine.resumeWith()方法會(huì)調(diào)用uCount.intercepted().resumeCancellableWith(),使得父協(xié)程進(jìn)行調(diào)度并接著執(zhí)行狀態(tài)機(jī)邏輯。

2.7 Dispatchers.Main

    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() 
= MainDispatcherLoader.dispatcher

直接詳見2.7.1

2.7.1 MainDispatcherLoader

internal object MainDispatcherLoader {
    private val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true)
    @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
    private fun loadMainDispatcher(): MainCoroutineDispatcher {
        return try {
            val factories = if (FAST_SERVICE_LOADER_ENABLED) {
                FastServiceLoader.loadMainDispatcherFactory()
            } else {
                // We are explicitly using the
                // `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
                // form of the ServiceLoader call to enable R8 optimization when compiled on Android.
                // 1.獲得MainDispatcherFactory的實(shí)現(xiàn)類
                ServiceLoader.load(
                        MainDispatcherFactory::class.java,
                        MainDispatcherFactory::class.java.classLoader
                ).iterator().asSequence().toList()
            }
            @Suppress("ConstantConditionIf")
            factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
                ?: createMissingDispatcher()
        } catch (e: Throwable) {
            // Service loader can throw an exception as well
            createMissingDispatcher(e)
        }
    }
}
  • 通過ServiceLoad機(jī)制獲取MainDispatcherFactory的實(shí)現(xiàn)類,而在源碼里面,其實(shí)現(xiàn)類為AndroidDispatcherFactory
  • 調(diào)用tryCreateDispatcher()創(chuàng)建分發(fā)器,詳見2.7.2。

2.7.2 AndroidDispatcherFactory

internal class AndroidDispatcherFactory : MainDispatcherFactory {
    override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
        HandlerContext(Looper.getMainLooper().asHandler(async = true))
    override fun hintOnError(): String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
    override val loadPriority: Int
        get() = Int.MAX_VALUE / 2
}

根據(jù)createDispatcher分發(fā),主線程分發(fā)器的實(shí)現(xiàn)類為HandlerContext類型,傳入用MainLooper構(gòu)建的Handler。詳見2.7.3。

2.7.3 HandlerContext

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    /**
     * Creates [CoroutineDispatcher] for the given Android [handler].
     *
     * @param handler a handler.
     * @param name an optional name for debugging.
     */
    constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)
    @Volatile
    private var _immediate: HandlerContext? = if (invokeImmediately) this else null
    override val immediate: HandlerContext = _immediate ?:
        HandlerContext(handler, name, true).also { _immediate = it }
    override fun isDispatchNeeded(context: CoroutineContext): Boolean {
        return !invokeImmediately || Looper.myLooper() != handler.looper
    }
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        if (!handler.post(block)) {
            cancelOnRejection(context, block)
        }
    }
    override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val block = Runnable {
            with(continuation) { resumeUndispatched(Unit) }
        }
        if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
            continuation.invokeOnCancellation { handler.removeCallbacks(block) }
        } else {
            cancelOnRejection(continuation.context, block)
        }
    }
   ...
}

HandlerContext繼承于HandlerDispatcher,而他的dispatch方法,可以看到,就是將block丟到設(shè)置MainLooperhandler執(zhí)行。所以續(xù)體將會(huì)在主線程執(zhí)行狀態(tài)機(jī),達(dá)到切換到主線程執(zhí)行協(xié)程的目的。

以上就是Kotlin協(xié)程的線程調(diào)度示例詳解的詳細(xì)內(nèi)容,更多關(guān)于Kotlin協(xié)程的線程調(diào)度的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 接口對(duì)象的實(shí)例化在接口回調(diào)中的使用方法

    接口對(duì)象的實(shí)例化在接口回調(diào)中的使用方法

    下面小編就為大家?guī)硪黄涌趯?duì)象的實(shí)例化在接口回調(diào)中的使用方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-02-02
  • 從0快速搭建一個(gè)實(shí)用的MVVM框架(超詳細(xì))

    從0快速搭建一個(gè)實(shí)用的MVVM框架(超詳細(xì))

    這篇文章主要介紹了從0搭建一個(gè)實(shí)用的MVVM框架,結(jié)合Jetpack,構(gòu)建快速開發(fā)的MVVM框架,支持快速生成ListActivity、ListFragment,主要是基于MVVM進(jìn)行快速開發(fā)上手即用,需要的朋友可以參考下
    2022-03-03
  • 在Kotlin開發(fā)中如何使用集合詳解

    在Kotlin開發(fā)中如何使用集合詳解

    這篇文章主要給大家介紹了關(guān)于在Kotlin開發(fā)中如何使用集合的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。
    2018-03-03
  • Android實(shí)現(xiàn)短視頻畫心效果

    Android實(shí)現(xiàn)短視頻畫心效果

    這篇文章主要為大家詳細(xì)介紹了Android實(shí)現(xiàn)短視頻畫心效果,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-05-05
  • Android 利用ViewPager實(shí)現(xiàn)圖片可以左右循環(huán)滑動(dòng)效果附代碼下載

    Android 利用ViewPager實(shí)現(xiàn)圖片可以左右循環(huán)滑動(dòng)效果附代碼下載

    本文通過一個(gè)小demo給大家展示一段代碼實(shí)現(xiàn)viewpage圖片左右循環(huán)滑動(dòng)效果,對(duì)viewgager循環(huán)滑動(dòng)相關(guān)知識(shí)感興趣的朋友一起學(xué)習(xí)吧
    2015-11-11
  • Android 中 WebView 的基本用法詳解

    Android 中 WebView 的基本用法詳解

    這篇文章主要介紹了Android 中 WebView 的基本用法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-01-01
  • Android中Parcelable的使用詳解

    Android中Parcelable的使用詳解

    Serializable是Java為我們提供的一個(gè)標(biāo)準(zhǔn)化的序列化接口。而Parcelable是Android為我們提供的序列化的接口。 這篇文章主要介紹了Android中Parcelable的使用 ,需要的朋友可以參考下
    2019-06-06
  • Android Studio 引用外部依賴時(shí)報(bào)錯(cuò)的解決方法

    Android Studio 引用外部依賴時(shí)報(bào)錯(cuò)的解決方法

    這篇文章主要介紹了Android Studio報(bào)錯(cuò)Unable to resolve dependency for':app@release/compileClasspath':無法引用任何外部依賴的解決辦法,需要的朋友可以參考下
    2018-01-01
  • Android開發(fā)之完全隱藏軟鍵盤的方法

    Android開發(fā)之完全隱藏軟鍵盤的方法

    這篇文章主要介紹了Android開發(fā)之完全隱藏軟鍵盤的方法的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下
    2016-06-06
  • Android PopupMenu彈出菜單的實(shí)現(xiàn)

    Android PopupMenu彈出菜單的實(shí)現(xiàn)

    這篇文章主要介紹了 Android PopupMenu彈出菜單的實(shí)現(xiàn)的相關(guān)資料,希望通過本文能幫助到大家,實(shí)現(xiàn)這樣的功能,需要的朋友可以參考下
    2017-10-10

最新評(píng)論