Kotlin協(xié)程操作之創(chuàng)建啟動掛起恢復(fù)詳解
下面以launch方法為例進行分析。
一.協(xié)程的創(chuàng)建
launch方法的代碼如下:
// CoroutineScope的擴展方法 public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { // 根據(jù)當前上下文,計算得到新的上下文 val newContext = newCoroutineContext(context) // 根據(jù)啟動模式,創(chuàng)建不同的續(xù)體 val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) // 啟動協(xié)程 coroutine.start(start, coroutine, block) return coroutine }
newCoroutineContext用于計算新的上下文,代碼如下:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { // coroutineContext為CoroutineScope中保存的全局變量 // 對上下文進行相加 val combined = coroutineContext + context // 用于debug val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined // 如果上下文中沒有調(diào)度器,則添加一個默認的調(diào)度器 return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) debug + Dispatchers.Default else debug }
1.start方法
在不指定協(xié)程啟動模式的情況下,協(xié)程將按照DEFAULT模式啟動,在上述代碼中,會調(diào)用StandaloneCoroutine對象的start方法。StandaloneCoroutine的代碼如下:
private open class StandaloneCoroutine( parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine<Unit>(parentContext, active) { override fun handleJobException(exception: Throwable): Boolean { handleCoroutineException(context, exception) return true } }
StandaloneCoroutine類中僅重寫了handleJobException方法,用于處理父協(xié)程不處理的異常。因此這里調(diào)用的start方法實際是父類AbstractCoroutine的方法,AbstractCoroutine類的start方法代碼如下:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { // 該方法用于完成父協(xié)程與子協(xié)程的綁定關(guān)聯(lián),同時確保父協(xié)程啟動 initParentJob() // 該方法的寫法等同于start.invoke(block, receiver, this) // 因此調(diào)用的CoroutineStart類的方法 start(block, receiver, this) }
AbstractCoroutine類的start方法內(nèi),調(diào)用了CoroutineStart類的invoke方法。
2.CoroutineStart類
CoroutineStart是一個枚舉類,用于根據(jù)不同的啟動模式去啟動協(xié)程,代碼如下:
public enum class CoroutineStart { // 四種啟動模式 DEFAULT, LAZY, // 具有實驗性,慎用 @ExperimentalCoroutinesApi ATOMIC, // 具有實驗性,慎用 @ExperimentalCoroutinesApi UNDISPATCHED; // 根據(jù)不同的啟動策略,啟動協(xié)程,執(zhí)行block @InternalCoroutinesApi public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(completion) ATOMIC -> block.startCoroutine(completion) UNDISPATCHED -> block.startCoroutineUndispatched(completion) LAZY -> Unit // 該模式不主動啟動,等待用戶調(diào)用start方法 } // 根據(jù)不同的啟動策略,啟動協(xié)程,執(zhí)行block @InternalCoroutinesApi public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(receiver, completion) ATOMIC -> block.startCoroutine(receiver, completion) UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion) LAZY -> Unit } // 當前的啟動模式是否為懶啟動 @InternalCoroutinesApi public val isLazy: Boolean get() = this === LAZY }
CoroutineStart類中有兩個invoke方法,其中一個參數(shù)中有receiver,另一個沒有receiver。在Kotlin協(xié)程中,很多方法都重載了帶有receiver的方法和不帶有receiver的方法。
receiver用于為block執(zhí)行提供一個環(huán)境。Kotlin中提供的啟動協(xié)程的方法都是通過帶receiver參數(shù)的start方法實現(xiàn)。通過receiver環(huán)境,可以更方便的實現(xiàn)一些操作,比如在launch啟動的協(xié)程中再次調(diào)用launch啟動新的協(xié)程。在沒有receiver的環(huán)境下執(zhí)行block,則更像是在suspend方法中執(zhí)行,如果需要啟動其他的協(xié)程,需要自己提供環(huán)境。
3.startCoroutineCancellable方法
startCoroutineCancellable是一個擴展方法,用來創(chuàng)建一個可以取消的協(xié)程,代碼如下:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) = runSafely(completion) { // createCoroutineUnintercepted:創(chuàng)建協(xié)程 // intercepted:攔截調(diào)度 // resumeCancellableWith:恢復(fù)執(zhí)行 createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit)) } // 如果創(chuàng)建的過程發(fā)生異常,則通知續(xù)體恢復(fù)后續(xù)代碼的執(zhí)行 private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) { try { block() } catch (e: Throwable) { completion.resumeWith(Result.failure(e)) } }
4.createCoroutineUnintercepted方法
createCoroutineUnintercepted方法用于創(chuàng)建一個新的、可掛起的、不受干擾的協(xié)程。
public expect fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted( receiver: R, completion: Continuation<T> ): Continuation<Unit>
在Kotlin中有很多被expect關(guān)鍵字標記的接口方法,需要找到對應(yīng)平臺下被actual標記的實現(xiàn)方法。
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted( receiver: R, completion: Continuation<T> ): Continuation<Unit> { // 用于debug val probeCompletion = probeCoroutineCreated(completion) return if (this is BaseContinuationImpl) create(receiver, probeCompletion) else { createCoroutineFromSuspendFunction(probeCompletion) { (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it) } } }
createCoroutineUnintercepted方法創(chuàng)建的協(xié)程需要手動調(diào)用resumeWith方法才可以啟動,但重復(fù)的調(diào)用resumeWith方法可能會導(dǎo)致狀態(tài)機發(fā)生異常。同時,參數(shù)中傳入的completion可能會在任意的上下文中被調(diào)用。
正常情況下,我們編寫的lambda表達式——block,在編譯器編譯時,會自動生成一個類,并繼承SuspendLambda類,實現(xiàn)Continuation等接口。因為SuspendLambda繼承自ContinuationImpl,ContinuationImpl繼承自BaseContinuationImpl,所以才有了上述代碼中的判斷邏輯。
如果當前的block對象的類型為BaseContinuationImpl,則調(diào)用create方法,這里的create方法是編譯器生成的類里的重寫方法,它的內(nèi)部就是通過我們傳入的參數(shù),創(chuàng)建并返回根據(jù)blcok生成的類的一個實例對象。
如果當前的block對象的類型不為BaseContinuationImpl,則需要通過createCoroutineFromSuspendFunction方法創(chuàng)建協(xié)程。這里假設(shè)lambda表達式的類型不是BaseContinuationImpl。
5.createCoroutineFromSuspendFunction方法
該方法用于在createCoroutineUnintercepted方法中使用,當一個被suspend修飾的lambda表達式?jīng)]有繼承BaseContinuationImpl類時,則通過此方法創(chuàng)建協(xié)程。
有兩種情況會調(diào)用該方法創(chuàng)建協(xié)程:第一種情況是lambda表達式中調(diào)用了其他的掛起方法;第二種情況是掛起方法是通過Java實現(xiàn)的。
createCoroutineFromSuspendFunction方法的代碼如下:
private inline fun <T> createCoroutineFromSuspendFunction( completion: Continuation<T>, crossinline block: (Continuation<T>) -> Any? ): Continuation<Unit> { val context = completion.context // 如果上下文為空 return if (context === EmptyCoroutineContext) // 創(chuàng)建一個受限協(xié)程 object : RestrictedContinuationImpl(completion as Continuation<Any?>) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() block(this) } 1 -> { label = 2 result.getOrThrow() } else -> error("This coroutine had already completed") } } else // 不為空,則創(chuàng)建一個正常的協(xié)程 object : ContinuationImpl(completion as Continuation<Any?>, context) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() block(this) } 1 -> { label = 2 result.getOrThrow() } else -> error("This coroutine had already completed") } } }
受限協(xié)程是指協(xié)程在運行過程中的,只能調(diào)用協(xié)程作用域中提供的掛起方法發(fā)生掛起,其他掛起方法不能調(diào)用,因為在掛起方法會對續(xù)體進行攔截,可能導(dǎo)致后續(xù)代碼的執(zhí)行變得無法預(yù)測。
典型的例子就是sequence方法,它創(chuàng)建的協(xié)程就是受限協(xié)程,只能通過調(diào)用yield方法或者yieldAll方法才能發(fā)生掛起。由于受限協(xié)程中不能進行協(xié)程調(diào)度,因此其上下文是空的。
這里launch方法的上下文有一個默認調(diào)度器,因此會創(chuàng)建一個ContinuationImpl對象。
到這里,協(xié)程完成了創(chuàng)建。
二.協(xié)程的啟動
再次回到startCoroutineCancellable方法,當調(diào)用createCoroutineUnintercepted創(chuàng)建好協(xié)程后,會調(diào)用intercepted方法,代碼如下:
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: this
intercepted方法是Continuation接口的擴展方法,內(nèi)部調(diào)用了ContinuationImpl類的intercepted方法。
1.ContinuationImpl類
internal abstract class ContinuationImpl( completion: Continuation<Any?>?, private val _context: CoroutineContext? ) : BaseContinuationImpl(completion) { constructor(completion: Continuation<Any?>?) : this(completion, completion?.context) public override val context: CoroutineContext get() = _context!! @Transient private var intercepted: Continuation<Any?>? = null // 如果沒有緩存,則從上下文中獲取攔截器,調(diào)用interceptContinuation進行攔截, // 將攔截的續(xù)體保存到全局變量 public fun intercepted(): Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it } protected override fun releaseIntercepted() { val intercepted = intercepted if (intercepted != null && intercepted !== this) { context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted) } this.intercepted = CompletedContinuation // just in case } }
這里的ContinuationInterceptor指的就是在newCoroutineContext方法中傳入的Dispatchers.Default調(diào)度器。CoroutineDispatcher類的interceptContinuation方法的代碼如下:
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { ... // 將續(xù)體包裹成DispatchedContinuation,并傳入當前調(diào)度器 public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation) ... }
2.resumeCancellableWith方法
再次回到startCoroutineCancellable方法,當調(diào)用intercepted方法進行攔截后,會調(diào)用resumeCancellableWith方法,代碼如下:
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) { is DispatchedContinuation -> resumeCancellableWith(result) else -> resumeWith(result) }
由于當前的Continuation對象的類型為DispatchedContinuation,因此調(diào)用DispatchedContinuation類的resumeCancellableWith方法,代碼如下:
internal class DispatchedContinuation<in T>( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T> ) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation { ... @Suppress("NOTHING_TO_INLINE") inline fun resumeCancellableWith(result: Result<T>) { val state = result.toState() // 是否進行調(diào)度 if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE // 進行調(diào)度 dispatcher.dispatch(context, this) } else {// Dispatcher.Unconfined調(diào)度器會走這里 executeUnconfined(state, MODE_CANCELLABLE) { // 協(xié)程未被取消 if (!resumeCancelled()) { // 恢復(fù)執(zhí)行 resumeUndispatchedWith(result) } } } } // 恢復(fù)執(zhí)行前判斷協(xié)程是否已經(jīng)取消執(zhí)行 @Suppress("NOTHING_TO_INLINE") inline fun resumeCancelled(): Boolean { // 獲取當前的協(xié)程任務(wù) val job = context[Job] // 如果不為空且不活躍 if (job != null && !job.isActive) { // 拋出異常 resumeWithException(job.getCancellationException()) return true } return false } @Suppress("NOTHING_TO_INLINE") inline fun resumeUndispatchedWith(result: Result<T>) { // 該方法在指定的上下文中執(zhí)行,在執(zhí)行后同步協(xié)程上下文變化 withCoroutineContext(context, countOrElement) { // 調(diào)用續(xù)體的resumeWith方法 continuation.resumeWith(result) } } ... } // Dispatchers.Unconfined模式下的調(diào)度 private inline fun DispatchedContinuation<*>.executeUnconfined( contState: Any?, mode: Int, doYield: Boolean = false, block: () -> Unit ): Boolean { // 從ThreadLocal中獲取EventLoop val eventLoop = ThreadLocalEventLoop.eventLoop // doYield表示是否正在讓出執(zhí)行 // 如果正在讓出執(zhí)行,并且執(zhí)行隊列還是空的,說明不需要執(zhí)行,返回false if (doYield && eventLoop.isUnconfinedQueueEmpty) return false // 如果EventLoop當前還在被Unconfined調(diào)度器使用 return if (eventLoop.isUnconfinedLoopActive) { _state = contState resumeMode = mode // 向隊列中添加當前的任務(wù) eventLoop.dispatchUnconfined(this) // 返回 true true } else { // 重新運行EventLoop runUnconfinedEventLoop(eventLoop, block = block) // 返回false false } }
runUnconfinedEventLoop方法是一個擴展方法,用于啟動EventLoop,代碼如下:
internal inline fun DispatchedTask<*>.runUnconfinedEventLoop( eventLoop: EventLoop, block: () -> Unit ) { // 引用計數(shù)+1 eventLoop.incrementUseCount(unconfined = true) try { // 先執(zhí)行當前的任務(wù) block() // 循環(huán)分發(fā)任務(wù) while (true) { // 全部執(zhí)行完畢,則退出分發(fā) if (!eventLoop.processUnconfinedEvent()) break } } catch (e: Throwable) { handleFatalException(e, null) } finally { // 引用計數(shù)+1 eventLoop.decrementUseCount(unconfined = true) } }
Dispatchers.Default調(diào)度器與Dispatchers.Unconfined調(diào)度器的調(diào)度邏輯基本都相同,最終都是調(diào)用Contination對象的resumeWith方法,同時傳入Result對象作為參數(shù)。
這里的Contination是createCoroutineUnintercepted方法創(chuàng)建的繼承ContinuationImpl的匿名內(nèi)部類對象。Result是resumeCancellableWith方法傳入的Result.success(Unit)對象,因為首次啟動,所以傳入類型為Unit。
調(diào)用匿名內(nèi)部類的resumeWith方法,實際調(diào)用的是父類BaseContinuationImpl的resumeWith方法。
3.BaseContinuationImpl類
BaseContinuationImpl類的resumeWith方法的代碼如下:
internal abstract class BaseContinuationImpl( public val completion: Continuation<Any?>? ) : Continuation<Any?>, CoroutineStackFrame, Serializable { public final override fun resumeWith(result: Result<Any?>) { var current = this var param = result // 循環(huán) while (true) { // 用于debug probeCoroutineResumed(current) // current環(huán)境下 with(current) { // completion用于續(xù)體執(zhí)行完的回調(diào),為空,則拋出異常 // 這里的completion就是一開始創(chuàng)建的StandaloneCoroutine對象 val completion = completion!! // 獲取執(zhí)行后的結(jié)果 val outcome: Result<Any?> = try { // 核心執(zhí)行 val outcome = invokeSuspend(param) // 如果返回值為COROUTINE_SUSPENDED,說明協(xié)程掛起,退出循環(huán) if (outcome === COROUTINE_SUSPENDED) return // 返回結(jié)果成功 Result.success(outcome) } catch (exception: Throwable) { // 返回結(jié)果失敗 Result.failure(exception) } // 釋放攔截的續(xù)體,狀態(tài)機終止 releaseIntercepted() // 這里沒有直接調(diào)用resume,而是通過循環(huán)代替遞歸 // 這也是resumeWith方法聲明為final的原因 if (completion is BaseContinuationImpl) { // 這種情況一般為多個suspend方法按順序執(zhí)行 // 等待下一次循環(huán) current = completion param = outcome } else { // 返回結(jié)果 completion.resumeWith(outcome) return } } } } protected abstract fun invokeSuspend(result: Result<Any?>): Any? protected open fun releaseIntercepted() { // does nothing here, overridden in ContinuationImpl } public open fun create(completion: Continuation<*>): Continuation<Unit> { throw UnsupportedOperationException("create(Continuation) has not been overridden") } public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> { throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden") } ... }
4.invokeSuspend方法
在上述代碼中,resumeWith方法內(nèi)部調(diào)用了invokeSuspend方法,這里的invokeSuspend方法實際就是createCoroutineFromSuspendFunction方法中創(chuàng)建的匿名內(nèi)部類的invokeSuspend方法。匿名內(nèi)部類的代碼如下:
object : ContinuationImpl(completion as Continuation<Any?>, context) { // 初始狀態(tài) private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 // 先去獲取一次結(jié)果,如果有異常,則直接拋出,避免執(zhí)行 // 比如在調(diào)度器中,如果發(fā)現(xiàn)協(xié)程已經(jīng)取消, // 則調(diào)用resumeWithException方法,在這里直接被拋出 result.getOrThrow() // 把當前續(xù)體傳入,執(zhí)行協(xié)程 // 可能發(fā)生掛起 block(this) } 1 -> { // 如果協(xié)程發(fā)生了掛起,那么恢復(fù)掛起后會走到這里 label = 2 // 獲取最終的執(zhí)行結(jié)果 result.getOrThrow() } else -> error("This coroutine had already completed") } }
三.協(xié)程的掛起與恢復(fù)
通過上述代碼的分析,協(xié)程的掛起實際就是在協(xié)程返回結(jié)果時返回一個COROUTINE_SUSPENDED對象,在收到COROUTINE_SUSPENDED結(jié)果后直接返回,等待被再次調(diào)用resumeWith恢復(fù)。
COROUTINE_SUSPENDED對象定義在枚舉類CoroutineSingletons中,代碼如下:
internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }
該枚舉類代表了協(xié)程的三個狀態(tài),協(xié)程在創(chuàng)建后狀態(tài)為UNDECIDED,如果執(zhí)行過程中發(fā)生掛起,則狀態(tài)變?yōu)镃OROUTINE_SUSPENDED,最后掛起恢復(fù)后狀態(tài)變?yōu)镽ESUMED。
而協(xié)程的恢復(fù)實際就是在掛起方法執(zhí)行完成后,通過調(diào)用協(xié)程執(zhí)行時傳入的續(xù)體的resumeWith方法,恢復(fù)后續(xù)代碼的執(zhí)行。
到此這篇關(guān)于Kotlin協(xié)程操作之創(chuàng)建啟動掛起恢復(fù)詳解的文章就介紹到這了,更多相關(guān)Kotlin協(xié)程操作內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
圣誕節(jié),寫個程序練練手————Android 全界面懸浮按鈕實現(xiàn)
這篇文章主要介紹了圣誕節(jié),寫個程序練練手————Android 全界面懸浮按鈕實現(xiàn)的相關(guān)資料,需要的朋友可以參考下2015-12-12Android?MaterialAlertDialogBuilder修改按鈕屬性
這篇文章主要介紹了Android?MaterialAlertDialogBuilder修改按鈕屬性實現(xiàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-11-11Android4.0平板開發(fā)之隱藏底部任務(wù)欄的方法
這篇文章主要介紹了Android4.0平板開發(fā)之隱藏底部任務(wù)欄的方法,結(jié)合實例形式較為詳細的分析了Android隱藏于顯示底部任務(wù)欄的相關(guān)技巧,具有一定參考借鑒價值,需要的朋友可以參考下2015-11-11android使用Ultra-PullToRefresh實現(xiàn)下拉刷新自定義代碼
本篇文章主要介紹了android使用Ultra-PullToRefresh實現(xiàn)下拉刷新新自定義,具有一定的參考價值,感興趣的小伙伴們可以參考一下。2017-02-02android使用service和activity獲取屏幕尺寸的方法
這篇文章主要介紹了android使用service和activity獲取屏幕尺寸的方法,實例分析了基于service和activity兩種方法獲取屏幕尺寸的相關(guān)技巧,非常簡單實用,需要的朋友可以參考下2015-08-08TabLayout實現(xiàn)ViewPager指示器的方法
這篇文章主要為大家詳細介紹了TabLayout實現(xiàn)ViewPager指示器,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-06-06android bitmap compress(圖片壓縮)代碼
android bitmap compress(圖片壓縮)代碼,需要的朋友可以參考一下2013-06-06Android控件gridview實現(xiàn)單行多列橫向滾動效果
這篇文章主要為大家詳細介紹了Android控件gridview實現(xiàn)單行多列橫向滾動效果,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-12-12Android開發(fā)筆記 今天學(xué)到的一些屬性
離開實驗室之前再貼上今天下午自己學(xué)到的一些基礎(chǔ)知識 上午干嘛了呢,忙著數(shù)據(jù)恢復(fù)呢2012-11-11AndroidStudio中重載方法@Override的使用詳解
這篇文章主要介紹了AndroidStudio中重載方法@Override的使用詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-04-04