欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kotlin協(xié)程操作之創(chuàng)建啟動掛起恢復(fù)詳解

 更新時間:2022年08月01日 14:19:06   作者:LeeDuo.  
本文的定位是協(xié)程的創(chuàng)建、啟動、掛起、恢復(fù),也會示例一些簡單的使用,這里不對suspend講解,,也不對協(xié)程的高級用法做闡述(熱數(shù)據(jù)通道Channel、冷數(shù)據(jù)流Flow...),本文主要講協(xié)程稍微深入的全面知識

下面以launch方法為例進行分析。

一.協(xié)程的創(chuàng)建

launch方法的代碼如下:

// CoroutineScope的擴展方法
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    // 根據(jù)當前上下文,計算得到新的上下文
    val newContext = newCoroutineContext(context)
    // 根據(jù)啟動模式,創(chuàng)建不同的續(xù)體
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    // 啟動協(xié)程
    coroutine.start(start, coroutine, block)
    return coroutine
}

newCoroutineContext用于計算新的上下文,代碼如下:

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    // coroutineContext為CoroutineScope中保存的全局變量
    // 對上下文進行相加
    val combined = coroutineContext + context
    // 用于debug
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    // 如果上下文中沒有調(diào)度器,則添加一個默認的調(diào)度器
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}

1.start方法

在不指定協(xié)程啟動模式的情況下,協(xié)程將按照DEFAULT模式啟動,在上述代碼中,會調(diào)用StandaloneCoroutine對象的start方法。StandaloneCoroutine的代碼如下:

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}

StandaloneCoroutine類中僅重寫了handleJobException方法,用于處理父協(xié)程不處理的異常。因此這里調(diào)用的start方法實際是父類AbstractCoroutine的方法,AbstractCoroutine類的start方法代碼如下:

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    // 該方法用于完成父協(xié)程與子協(xié)程的綁定關(guān)聯(lián),同時確保父協(xié)程啟動
    initParentJob()
    // 該方法的寫法等同于start.invoke(block, receiver, this)
    // 因此調(diào)用的CoroutineStart類的方法
    start(block, receiver, this)
}

AbstractCoroutine類的start方法內(nèi),調(diào)用了CoroutineStart類的invoke方法。

2.CoroutineStart類

CoroutineStart是一個枚舉類,用于根據(jù)不同的啟動模式去啟動協(xié)程,代碼如下:

public enum class CoroutineStart {
    // 四種啟動模式
    DEFAULT,
    LAZY,
    // 具有實驗性,慎用
    @ExperimentalCoroutinesApi
    ATOMIC,
    // 具有實驗性,慎用
    @ExperimentalCoroutinesApi
    UNDISPATCHED;
    // 根據(jù)不同的啟動策略,啟動協(xié)程,執(zhí)行block
    @InternalCoroutinesApi
    public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(completion)
            ATOMIC -> block.startCoroutine(completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(completion)
            LAZY -> Unit // 該模式不主動啟動,等待用戶調(diào)用start方法
        }
    // 根據(jù)不同的啟動策略,啟動協(xié)程,執(zhí)行block
    @InternalCoroutinesApi
    public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            ATOMIC -> block.startCoroutine(receiver, completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            LAZY -> Unit
        }
    // 當前的啟動模式是否為懶啟動
    @InternalCoroutinesApi
    public val isLazy: Boolean get() = this === LAZY
}

CoroutineStart類中有兩個invoke方法,其中一個參數(shù)中有receiver,另一個沒有receiver。在Kotlin協(xié)程中,很多方法都重載了帶有receiver的方法和不帶有receiver的方法。

receiver用于為block執(zhí)行提供一個環(huán)境。Kotlin中提供的啟動協(xié)程的方法都是通過帶receiver參數(shù)的start方法實現(xiàn)。通過receiver環(huán)境,可以更方便的實現(xiàn)一些操作,比如在launch啟動的協(xié)程中再次調(diào)用launch啟動新的協(xié)程。在沒有receiver的環(huán)境下執(zhí)行block,則更像是在suspend方法中執(zhí)行,如果需要啟動其他的協(xié)程,需要自己提供環(huán)境。

3.startCoroutineCancellable方法

startCoroutineCancellable是一個擴展方法,用來創(chuàng)建一個可以取消的協(xié)程,代碼如下:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        // createCoroutineUnintercepted:創(chuàng)建協(xié)程
        // intercepted:攔截調(diào)度
        // resumeCancellableWith:恢復(fù)執(zhí)行
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
    }
// 如果創(chuàng)建的過程發(fā)生異常,則通知續(xù)體恢復(fù)后續(xù)代碼的執(zhí)行
private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
    try {
        block()
    } catch (e: Throwable) {
        completion.resumeWith(Result.failure(e))
    }
}

4.createCoroutineUnintercepted方法

createCoroutineUnintercepted方法用于創(chuàng)建一個新的、可掛起的、不受干擾的協(xié)程。

public expect fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit>

在Kotlin中有很多被expect關(guān)鍵字標記的接口方法,需要找到對應(yīng)平臺下被actual標記的實現(xiàn)方法。

public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    // 用于debug
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

createCoroutineUnintercepted方法創(chuàng)建的協(xié)程需要手動調(diào)用resumeWith方法才可以啟動,但重復(fù)的調(diào)用resumeWith方法可能會導(dǎo)致狀態(tài)機發(fā)生異常。同時,參數(shù)中傳入的completion可能會在任意的上下文中被調(diào)用。

正常情況下,我們編寫的lambda表達式——block,在編譯器編譯時,會自動生成一個類,并繼承SuspendLambda類,實現(xiàn)Continuation等接口。因為SuspendLambda繼承自ContinuationImpl,ContinuationImpl繼承自BaseContinuationImpl,所以才有了上述代碼中的判斷邏輯。

如果當前的block對象的類型為BaseContinuationImpl,則調(diào)用create方法,這里的create方法是編譯器生成的類里的重寫方法,它的內(nèi)部就是通過我們傳入的參數(shù),創(chuàng)建并返回根據(jù)blcok生成的類的一個實例對象。

如果當前的block對象的類型不為BaseContinuationImpl,則需要通過createCoroutineFromSuspendFunction方法創(chuàng)建協(xié)程。這里假設(shè)lambda表達式的類型不是BaseContinuationImpl。

5.createCoroutineFromSuspendFunction方法

該方法用于在createCoroutineUnintercepted方法中使用,當一個被suspend修飾的lambda表達式?jīng)]有繼承BaseContinuationImpl類時,則通過此方法創(chuàng)建協(xié)程。

有兩種情況會調(diào)用該方法創(chuàng)建協(xié)程:第一種情況是lambda表達式中調(diào)用了其他的掛起方法;第二種情況是掛起方法是通過Java實現(xiàn)的。

createCoroutineFromSuspendFunction方法的代碼如下:

private inline fun <T> createCoroutineFromSuspendFunction(
    completion: Continuation<T>,
    crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit> {
    val context = completion.context
    // 如果上下文為空
    return if (context === EmptyCoroutineContext)
        // 創(chuàng)建一個受限協(xié)程
        object : RestrictedContinuationImpl(completion as Continuation<Any?>) {
            private var label = 0

            override fun invokeSuspend(result: Result<Any?>): Any? =
                when (label) {
                    0 -> {
                        label = 1
                        result.getOrThrow()
                        block(this)
                    }
                    1 -> {
                        label = 2
                        result.getOrThrow()
                    }
                    else -> error("This coroutine had already completed")
                }
        }
    else // 不為空,則創(chuàng)建一個正常的協(xié)程
        object : ContinuationImpl(completion as Continuation<Any?>, context) {
            private var label = 0

            override fun invokeSuspend(result: Result<Any?>): Any? =
                when (label) {
                    0 -> {
                        label = 1
                        result.getOrThrow()
                        block(this)
                    }
                    1 -> {
                        label = 2
                        result.getOrThrow()
                    }
                    else -> error("This coroutine had already completed")
                }
        }
}

受限協(xié)程是指協(xié)程在運行過程中的,只能調(diào)用協(xié)程作用域中提供的掛起方法發(fā)生掛起,其他掛起方法不能調(diào)用,因為在掛起方法會對續(xù)體進行攔截,可能導(dǎo)致后續(xù)代碼的執(zhí)行變得無法預(yù)測。

典型的例子就是sequence方法,它創(chuàng)建的協(xié)程就是受限協(xié)程,只能通過調(diào)用yield方法或者yieldAll方法才能發(fā)生掛起。由于受限協(xié)程中不能進行協(xié)程調(diào)度,因此其上下文是空的。

這里launch方法的上下文有一個默認調(diào)度器,因此會創(chuàng)建一個ContinuationImpl對象。

到這里,協(xié)程完成了創(chuàng)建。

二.協(xié)程的啟動

再次回到startCoroutineCancellable方法,當調(diào)用createCoroutineUnintercepted創(chuàng)建好協(xié)程后,會調(diào)用intercepted方法,代碼如下:

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this

intercepted方法是Continuation接口的擴展方法,內(nèi)部調(diào)用了ContinuationImpl類的intercepted方法。

1.ContinuationImpl類

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
    public override val context: CoroutineContext
        get() = _context!!
    @Transient
    private var intercepted: Continuation<Any?>? = null
    // 如果沒有緩存,則從上下文中獲取攔截器,調(diào)用interceptContinuation進行攔截,
    // 將攔截的續(xù)體保存到全局變量
    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
    protected override fun releaseIntercepted() {
        val intercepted = intercepted
        if (intercepted != null && intercepted !== this) {
            context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
        }
        this.intercepted = CompletedContinuation // just in case
    }
}

這里的ContinuationInterceptor指的就是在newCoroutineContext方法中傳入的Dispatchers.Default調(diào)度器。CoroutineDispatcher類的interceptContinuation方法的代碼如下:

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
     ...
    // 將續(xù)體包裹成DispatchedContinuation,并傳入當前調(diào)度器 
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
    ...
}

2.resumeCancellableWith方法

再次回到startCoroutineCancellable方法,當調(diào)用intercepted方法進行攔截后,會調(diào)用resumeCancellableWith方法,代碼如下:

public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result)
    else -> resumeWith(result)
}

由于當前的Continuation對象的類型為DispatchedContinuation,因此調(diào)用DispatchedContinuation類的resumeCancellableWith方法,代碼如下:

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
    ...
    @Suppress("NOTHING_TO_INLINE")
    inline fun resumeCancellableWith(result: Result<T>) {
        val state = result.toState()
        // 是否進行調(diào)度
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            // 進行調(diào)度
            dispatcher.dispatch(context, this)
        } else {// Dispatcher.Unconfined調(diào)度器會走這里
            executeUnconfined(state, MODE_CANCELLABLE) {
                // 協(xié)程未被取消
                if (!resumeCancelled()) {
                    // 恢復(fù)執(zhí)行
                    resumeUndispatchedWith(result)
                }
            }
        }
    }
    // 恢復(fù)執(zhí)行前判斷協(xié)程是否已經(jīng)取消執(zhí)行
    @Suppress("NOTHING_TO_INLINE")
    inline fun resumeCancelled(): Boolean {
        // 獲取當前的協(xié)程任務(wù)
        val job = context[Job]
        // 如果不為空且不活躍
        if (job != null && !job.isActive) {
            // 拋出異常
            resumeWithException(job.getCancellationException())
            return true
        }
        return false
    }
    @Suppress("NOTHING_TO_INLINE")
    inline fun resumeUndispatchedWith(result: Result<T>) {
        // 該方法在指定的上下文中執(zhí)行,在執(zhí)行后同步協(xié)程上下文變化
        withCoroutineContext(context, countOrElement) {
            // 調(diào)用續(xù)體的resumeWith方法
            continuation.resumeWith(result)
        }
    }
    ...
}
// Dispatchers.Unconfined模式下的調(diào)度
private inline fun DispatchedContinuation<*>.executeUnconfined(
    contState: Any?, mode: Int, doYield: Boolean = false,
    block: () -> Unit
): Boolean {
    // 從ThreadLocal中獲取EventLoop
    val eventLoop = ThreadLocalEventLoop.eventLoop
    // doYield表示是否正在讓出執(zhí)行
    // 如果正在讓出執(zhí)行,并且執(zhí)行隊列還是空的,說明不需要執(zhí)行,返回false
    if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
    // 如果EventLoop當前還在被Unconfined調(diào)度器使用
    return if (eventLoop.isUnconfinedLoopActive) {
        _state = contState
        resumeMode = mode
        // 向隊列中添加當前的任務(wù)
        eventLoop.dispatchUnconfined(this)
        // 返回 true
        true
    } else {
        // 重新運行EventLoop
        runUnconfinedEventLoop(eventLoop, block = block)
        // 返回false
        false
    }
}

runUnconfinedEventLoop方法是一個擴展方法,用于啟動EventLoop,代碼如下:

internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
    eventLoop: EventLoop,
    block: () -> Unit
) {
    // 引用計數(shù)+1
    eventLoop.incrementUseCount(unconfined = true)
    try {
        // 先執(zhí)行當前的任務(wù)
        block()
        // 循環(huán)分發(fā)任務(wù)
        while (true) {
            // 全部執(zhí)行完畢,則退出分發(fā)
            if (!eventLoop.processUnconfinedEvent()) break
        }
    } catch (e: Throwable) {
        handleFatalException(e, null)
    } finally {
        // 引用計數(shù)+1
        eventLoop.decrementUseCount(unconfined = true)
    }
}

Dispatchers.Default調(diào)度器與Dispatchers.Unconfined調(diào)度器的調(diào)度邏輯基本都相同,最終都是調(diào)用Contination對象的resumeWith方法,同時傳入Result對象作為參數(shù)。

這里的Contination是createCoroutineUnintercepted方法創(chuàng)建的繼承ContinuationImpl的匿名內(nèi)部類對象。Result是resumeCancellableWith方法傳入的Result.success(Unit)對象,因為首次啟動,所以傳入類型為Unit。

調(diào)用匿名內(nèi)部類的resumeWith方法,實際調(diào)用的是父類BaseContinuationImpl的resumeWith方法。

3.BaseContinuationImpl類

BaseContinuationImpl類的resumeWith方法的代碼如下:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        // 循環(huán)
        while (true) {
            // 用于debug
            probeCoroutineResumed(current)
            // current環(huán)境下
            with(current) {
                // completion用于續(xù)體執(zhí)行完的回調(diào),為空,則拋出異常
                // 這里的completion就是一開始創(chuàng)建的StandaloneCoroutine對象
                val completion = completion!! 
                // 獲取執(zhí)行后的結(jié)果
                val outcome: Result<Any?> =
                    try {
                        // 核心執(zhí)行
                        val outcome = invokeSuspend(param)
                        // 如果返回值為COROUTINE_SUSPENDED,說明協(xié)程掛起,退出循環(huán)
                        if (outcome === COROUTINE_SUSPENDED) return
                        // 返回結(jié)果成功
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        // 返回結(jié)果失敗
                        Result.failure(exception)
                    }
                // 釋放攔截的續(xù)體,狀態(tài)機終止
                releaseIntercepted() 
                // 這里沒有直接調(diào)用resume,而是通過循環(huán)代替遞歸
                // 這也是resumeWith方法聲明為final的原因
                if (completion is BaseContinuationImpl) {
                    // 這種情況一般為多個suspend方法按順序執(zhí)行
                    // 等待下一次循環(huán)
                    current = completion
                    param = outcome
                } else {
                    // 返回結(jié)果
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
    protected abstract fun invokeSuspend(result: Result<Any?>): Any?
    protected open fun releaseIntercepted() {
        // does nothing here, overridden in ContinuationImpl
    }
    public open fun create(completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Continuation) has not been overridden")
    }
    public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
    }
     ...
}

4.invokeSuspend方法

在上述代碼中,resumeWith方法內(nèi)部調(diào)用了invokeSuspend方法,這里的invokeSuspend方法實際就是createCoroutineFromSuspendFunction方法中創(chuàng)建的匿名內(nèi)部類的invokeSuspend方法。匿名內(nèi)部類的代碼如下:

object : ContinuationImpl(completion as Continuation<Any?>, context) {
    // 初始狀態(tài)
    private var label = 0
    override fun invokeSuspend(result: Result<Any?>): Any? =
            when (label) {
                0 -> {
                    label = 1
                    // 先去獲取一次結(jié)果,如果有異常,則直接拋出,避免執(zhí)行
                    // 比如在調(diào)度器中,如果發(fā)現(xiàn)協(xié)程已經(jīng)取消,
                    // 則調(diào)用resumeWithException方法,在這里直接被拋出
                    result.getOrThrow()
                    // 把當前續(xù)體傳入,執(zhí)行協(xié)程
                    // 可能發(fā)生掛起
                    block(this)
                }
                1 -> {
                    // 如果協(xié)程發(fā)生了掛起,那么恢復(fù)掛起后會走到這里
                    label = 2
                    // 獲取最終的執(zhí)行結(jié)果
                    result.getOrThrow()
                }
                else -> error("This coroutine had already completed")
            }
}

三.協(xié)程的掛起與恢復(fù)

通過上述代碼的分析,協(xié)程的掛起實際就是在協(xié)程返回結(jié)果時返回一個COROUTINE_SUSPENDED對象,在收到COROUTINE_SUSPENDED結(jié)果后直接返回,等待被再次調(diào)用resumeWith恢復(fù)。

COROUTINE_SUSPENDED對象定義在枚舉類CoroutineSingletons中,代碼如下:

internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }

該枚舉類代表了協(xié)程的三個狀態(tài),協(xié)程在創(chuàng)建后狀態(tài)為UNDECIDED,如果執(zhí)行過程中發(fā)生掛起,則狀態(tài)變?yōu)镃OROUTINE_SUSPENDED,最后掛起恢復(fù)后狀態(tài)變?yōu)镽ESUMED。

而協(xié)程的恢復(fù)實際就是在掛起方法執(zhí)行完成后,通過調(diào)用協(xié)程執(zhí)行時傳入的續(xù)體的resumeWith方法,恢復(fù)后續(xù)代碼的執(zhí)行。

到此這篇關(guān)于Kotlin協(xié)程操作之創(chuàng)建啟動掛起恢復(fù)詳解的文章就介紹到這了,更多相關(guān)Kotlin協(xié)程操作內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論