Kotlin Job啟動(dòng)流程源碼層深入分析
Job啟動(dòng)流程
job啟動(dòng)流程,我們先從一段最簡單使用協(xié)程的代碼開始,進(jìn)行代碼跟跟蹤,順便引出幾個(gè)關(guān)鍵的概念,在后面章節(jié)里面去單獨(dú)分析。代碼如下:
private fun testParentChildJob() { val coroutineContext = Job() + CoroutineName("name1") + Dispatchers.IO + CoroutineExceptionHandler{ c,e -> println(e.message) } val myScope = CoroutineScope(coroutineContext) val job = myScope.launch { println("myScope.launch :") } }
首先創(chuàng)建一個(gè)有四種元素的上下文域myScope,由Job() + CoroutineName("name1") + Dispatchers.IO + CoroutineExceptionHandler{ c,e -> println(e.message) }
組成,上一章coroutineContext篇已經(jīng)講過plus操作的過程了,不贅述。
接著用這個(gè)作用域myScope開啟一個(gè)協(xié)程,協(xié)程內(nèi)打印println("myScope.launch :")
。
我自己從launch函數(shù)一步一步跟蹤后,得到了如下圖所示的流程:
launch流程分析
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }
分為三步:
首先使用入?yún)?code>context: CoroutineContext = EmptyCoroutineContext,創(chuàng)建一個(gè)新的上下文集合newCoroutineContext(context)
,newCoroutineContext函數(shù)操作:就是根據(jù)所在的scope域的上下文集合和入?yún)⑦M(jìn)行組合操作,得到一個(gè)新的上下文集合,代碼如下:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { val combined = coroutineContext + context val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) debug + Dispatchers.Default else debug }
可以看到各種+
操作,就是coroutineContext的各種plus操作,可以得到一個(gè)繼承自所在scope域的上下文集合(這個(gè)域由coroutineContext變量決定,這個(gè)變量屬于CoroutineScope成員),并且包含了入?yún)⒌腸ontext元素,這樣上下文集合就具有繼承性,并且自己還可以對(duì)已有元素進(jìn)行覆蓋。上一篇coroutineContext篇已經(jīng)講過,就不贅述了。
由于我們使用的默認(rèn)方式launch的,使用上面創(chuàng)建的newContext元素集合,就會(huì)創(chuàng)建一個(gè)StandaloneCoroutine(newContext, active = true)
協(xié)程對(duì)象。這個(gè)對(duì)象繼承關(guān)系比較復(fù)雜,繼承關(guān)系如下:
這個(gè)類里面包含了很多成員變量,源碼如下:
private open class StandaloneCoroutine( parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine<Unit>(parentContext, active) { //省略。。。 } public abstract class AbstractCoroutine<in T>( /** * The context of the parent coroutine. */ @JvmField protected val parentContext: CoroutineContext, active: Boolean = true ) : JobSupport(active), Job, Continuation<T>, CoroutineScope { /** * The context of this coroutine that includes this coroutine as a [Job]. */ @Suppress("LeakingThis") public final override val context: CoroutineContext = parentContext + this /** * The context of this scope which is the same as the [context] of this coroutine. */ public override val coroutineContext: CoroutineContext get() = context override val isActive: Boolean get() = super.isActive }
context成員變量是外部傳進(jìn)來的newContext上下文集合 + this
得到的,那么newContext的Job元素會(huì)被this替換掉;
coroutineContext成員變量是CoroutineScope接口的成員,覆寫為context對(duì)象; isActive標(biāo)志這個(gè)Job是否是存活狀態(tài); 調(diào)用剛剛創(chuàng)建的coroutine協(xié)程的start方法,coroutine.start(start, coroutine, block)
,跟進(jìn)去看看
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { initParentJob() start(block, receiver, this) }
initParentJob()
方法主要是用于關(guān)聯(lián)父子Job的,這里先不講,對(duì)啟動(dòng)流程沒啥影響。
start(block, receiver, this)
是正真啟動(dòng)協(xié)程的地方,CoroutineStart
的值是DEFAULT
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(completion) ATOMIC -> block.startCoroutine(completion) UNDISPATCHED -> block.startCoroutineUndispatched(completion) LAZY -> Unit // will start lazily }
那么調(diào)用的就是DEFAULT -> block.startCoroutineCancellable(completion)
這個(gè)分支,
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) { createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit)) } public actual fun <T> (suspend () -> T).createCoroutineUnintercepted( completion: Continuation<T> ): Continuation<Unit> { //省略。。。 createCoroutineFromSuspendFunction(probeCompletion) { (this as Function1<Continuation<T>, Any?>).invoke(it) } } private inline fun <T> createCoroutineFromSuspendFunction( completion: Continuation<T>, crossinline block: (Continuation<T>) -> Any? ): Continuation<Unit> { val context = completion.context // label == 0 when coroutine is not started yet (initially) or label == 1 when it was return if (context === EmptyCoroutineContext) //省略。。。 else object : ContinuationImpl(completion as Continuation<Any?>, context) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith block(this) // run the block, may return or suspend } //省略。。。 } } }
第一步:createCoroutineUnintercepted(completion)
就是以completion作為參數(shù)創(chuàng)建一個(gè)ContinuationImpl對(duì)象,這個(gè)completion就是上面創(chuàng)建的StandaloneCoroutine對(duì)象。這個(gè)新的ContinuationImpl對(duì)象是繼承自Continuation,那么他就有fun resumeWith(result: Result<T>)
方法,該方法是用于恢復(fù)掛起點(diǎn),val context: CoroutineContext
參數(shù),這個(gè)參數(shù)就是Continuation的所關(guān)聯(lián)的上下文集合。
我們?cè)僮约嚎纯催@個(gè)createCoroutineFromSuspendFunction
這個(gè)方法,發(fā)現(xiàn)將我們launch{}的lambda參數(shù)進(jìn)行包裝后(this as Function1<Continuation<T>, Any?>).invoke(it)
然后作為入?yún)lock,這個(gè)block作為ContinuationImpl對(duì)象覆寫的invokeSuspend函數(shù)的回調(diào)函數(shù)。那么可以從這個(gè)看出一個(gè)關(guān)系:
ContinuationImpl.invokeSuspend -> launch入?yún)⒌膌ambda函數(shù)體
第二步:就是調(diào)用ContinuationImpl .intercepted()
,內(nèi)部處理是獲取ContinuationImpl的上下文集合中的ContinuationInterceptor元素,然后將ContinuationImpl作為參數(shù),包裝成DispatchedContinuation(this, continuation)
,其中this代表ContinuationInterceptor也就是dispatcher,continuation代表剛剛傳遞進(jìn)來的ContinuationImpl。
第三步:resumeCancellableWith(Result.success(Unit))
,調(diào)用DispatchedContinuation的resumeCancellableWith函數(shù),代碼如下:
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) { is DispatchedContinuation -> resumeCancellableWith(result) else -> resumeWith(result) } //DispatchedContinuation extends DIspatchedTask inline fun resumeCancellableWith(result: Result<T>) { val state = result.toState() if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE dispatcher.dispatch(context, this) } else { 省略。。。 } } //DIspatchedTask public final override fun run() { val taskContext = this.taskContext var fatalException: Throwable? = null try { val delegate = delegate as DispatchedContinuation<T> val continuation = delegate.continuation withCoroutineContext(context, delegate.countOrElement) { //省略。。。 if (exception == null && job != null && !job.isActive) { //省略。。。 } else { if (exception != null) continuation.resumeWithException(exception) else continuation.resume(getSuccessfulResult(state)) } } } catch (e: Throwable) { //省略。。。 } }
由于DispatchedContinuation是繼承自DIspatchedTask的,所以DispatchedContinuation的run方法是DIspatchedTask已經(jīng)實(shí)現(xiàn)的了,所以dispatcher.dispatch(context, this)
,dispatcher調(diào)用的是DIspatchedTask.run方法,(dispatcher是一個(gè)線程池和java線程池類似,但是有一點(diǎn)區(qū)別,后面章節(jié)再講),run方法中,首先獲取delegate,然后取出continuation變量,這個(gè)delegate其實(shí)是被DispatchedContinuation覆寫的,而且實(shí)現(xiàn)的Continuation接口被構(gòu)造函數(shù)的continuation代理,這個(gè)入?yún)ontinuation其實(shí)就是ContinuationImpl,上一步分析過了。
internal class DispatchedContinuation<in T>( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T> ) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation { //省略。。。 override val delegate: Continuation<T> get() = this //省略。。。 } //Continuation public inline fun <T> Continuation<T>.resume(value: T): Unit = resumeWith(Result.success(value))
那么其實(shí)就是調(diào)用的ContinuationImpl.resumeWith(Result.success(value))
方法,ContinuationImpl繼承自BaseContinuationImpl,繼續(xù)進(jìn)去看看
public final override fun resumeWith(result: Result<Any?>) { // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var param = result while (true) { //省略。。。 with(current) { try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } //省略。。。 releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { //省略。。。 } else { // top-level completion reached -- invoke and return completion.resumeWith(outcome) return } } } }
第一步,調(diào)用val outcome = invokeSuspend(param)
,上面已經(jīng)分析了,invokeSuspend被ContinuationImpl覆寫了,內(nèi)部回調(diào)了launch的lambda表達(dá)式;
第二步,調(diào)用completion.resumeWith(outcome)
,這個(gè)completion上面分析了,是StandAloneCoroutine協(xié)程,調(diào)用了StandAloneCoroutine對(duì)象的resumeWith方法,這個(gè)方法里面用于更新協(xié)程狀態(tài),比如協(xié)程成功,失敗之類的。
綜上,通過上面的invokeSuspend函數(shù)調(diào)用,最終調(diào)用到了launch的lambda表達(dá)式,也就是我們業(yè)務(wù)代碼,我們的業(yè)務(wù)代碼是被封裝到了ContinuationImpl類中。
通過上面的分析,一共發(fā)現(xiàn)了三種不同類型的continuation,它們分別是:
DispatchedContinuation用于分發(fā)continuation到指定的線程池中; ContinuationImpl用于包裝launch的lambda代碼塊作為業(yè)務(wù)代碼代理類; StandAloneCoroutine協(xié)程管理類管理Job生命周期以及協(xié)程的狀態(tài)父子Job關(guān)系維護(hù)等等。
它們的調(diào)用鏈如下:
父子Job關(guān)聯(lián)分析
父子Job關(guān)聯(lián)操作是在上面launch流程中的,在調(diào)用start方法的時(shí)候進(jìn)行關(guān)聯(lián)的:
initParentJob方法里面,先調(diào)用parent.start方法,確保parent的Job已經(jīng)啟動(dòng)了,接著調(diào)用parent.attachChild(this)方法,用于關(guān)聯(lián)父子Job。 代碼如下:
//AbstractCoroutine internal fun initParentJob() { //取出上下文集合中的Job元素,調(diào)用initParentJobInternal方法 initParentJobInternal(parentContext[Job]) } //AbstractCoroutine internal fun initParentJobInternal(parent: Job?) { //省略。。。 parent.start() // make sure the parent is started //省略。。。 val handle = parent.attachChild(this) parentHandle = handle //省略。。。 if (isCompleted) { handle.dispose() } }
首先取出parentContext[Job]的Job元素,這個(gè)parentContext就是launch的時(shí)候根據(jù)scope的上下文集合創(chuàng)建出來的上下文集合,取出的Job元素就是父Job,作為initParentJobInternal的參數(shù),接著調(diào)用parent.attachChild(this):
//JobSupport public final override fun attachChild(child: ChildJob): ChildHandle { /* * Note: This function attaches a special ChildHandleNode node object. This node object * is handled in a special way on completion on the coroutine (we wait for all of them) and * is handled specially by invokeOnCompletion itself -- it adds this node to the list even * if the job is already cancelling. For cancelling state child is attached under state lock. * It's required to properly wait all children before completion and provide linearizable hierarchy view: * If child is attached when the job is already being cancelled, such child will receive immediate notification on * cancellation, but parent *will* wait for that child before completion and will handle its exception. */ return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle } //JobSupport internal class ChildHandleNode( parent: JobSupport, @JvmField val childJob: ChildJob ) : JobCancellingNode<JobSupport>(parent), ChildHandle { override fun invoke(cause: Throwable?) = childJob.parentCancelled(job) override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause) override fun toString(): String = "ChildHandle[$childJob]" }
首先創(chuàng)建了一個(gè)handler = ChildHandleNode(this, child).asHandler對(duì)象,這個(gè)對(duì)象ChildHandleNode作為參數(shù)傳遞給invokeOnCompletion,然后返回一個(gè)ChildHandle類型的對(duì)象,賦值給子Job的parentHandle val handle = parent.attachChild(this); parentHandle = handle
,parentHandle 這個(gè)是子Job持有的變量,ChildHandle接口擁有childCancelled方法,用于子Job通知父Job,子Job已經(jīng)取消了,父Job需要根據(jù)子Job狀態(tài)繼續(xù)進(jìn)行處理。
//JobSupport public final override fun invokeOnCompletion( onCancelling: Boolean, invokeImmediately: Boolean, handler: CompletionHandler ): DisposableHandle { var nodeCache: JobNode<*>? = null loopOnState { state -> when (state) { is Empty -> { // EMPTY_X state -- no completion handlers if (state.isActive) { // try move to SINGLE state val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } if (_state.compareAndSet(state, node)) return node } //省略。。。 } is Incomplete -> { val list = state.list if (list == null) { // SINGLE/SINGLE+ promoteSingleToNodeList(state as JobNode<*>) } else { var rootCause: Throwable? = null var handle: DisposableHandle = NonDisposableHandle val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } if (!addLastAtomic(state, list, node)) return@loopOnState // retry if (rootCause == null) return node //省略。。。 if (rootCause != null) { //省略。。。 } else { val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } if (addLastAtomic(state, list, node)) return node } } } else -> { // is complete //省略。。。 } } } }
invokeOnCompletion方法就是,將傳遞進(jìn)來的handler: CompletionHandler,分情況存儲(chǔ)起來,
當(dāng)state狀態(tài)是Empty狀態(tài),創(chuàng)建一個(gè)代理節(jié)點(diǎn)node ,之后存入到state中; 當(dāng)state是Incomplete狀態(tài),如果state.list結(jié)構(gòu)是空的,那么創(chuàng)建一個(gè)鏈表,將node 節(jié)點(diǎn)作為第一個(gè)節(jié)點(diǎn)存進(jìn)去,當(dāng)前state.list不為空,那么將node節(jié)點(diǎn)插入到鏈表的末尾。 這樣經(jīng)過上面這兩步: 子Job持有的parentHandle對(duì)象可以通知父Job自己已經(jīng)取消了:
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
父Job持有的state對(duì)象保存著包裝著子Job的ChildHandleNode對(duì)象,父Job通過遍歷調(diào)用列表中的node元素的invoke方法,即可取消所有的子Job:
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
會(huì)發(fā)現(xiàn), 調(diào)用launch生成一個(gè)Job,這個(gè)Job就會(huì)initParentJob() ,進(jìn)而子Job會(huì)持有父Job,父Job也會(huì)將子Job加入到state的數(shù)據(jù)結(jié)構(gòu)中,進(jìn)而形成了樹的結(jié)構(gòu),類似于下圖:
父子Job都可以互相通知對(duì)方自己已經(jīng)取消,需要做出對(duì)應(yīng)的處理。
結(jié)論
launch啟動(dòng)一個(gè)協(xié)程,會(huì)生成三個(gè)continuation,分別是
DispatchedContinuation用于分發(fā)continuation到指定的線程池中; ContinuationImpl用于包裝launch的lambda代碼塊作為業(yè)務(wù)代碼代理類; StandAloneCoroutine協(xié)程管理類管理Job生命周期以及協(xié)程的狀態(tài)父子Job關(guān)系維護(hù)等等。 調(diào)用鏈:DispatchedContinuation -> ContinuationImpl(在這里調(diào)用launch的lambda業(yè)務(wù)代碼塊) -> StandAloneCoroutine
launch啟動(dòng)一個(gè)協(xié)程Job,這個(gè)Job所在域如果存在parentJob ,那么parentJob和Job會(huì)形成樹結(jié)構(gòu)上的父子節(jié)點(diǎn),并且子Job繼承了父Job的CoroutineScope的上下文集合(根據(jù)參數(shù)會(huì)覆蓋一些重復(fù)Key的元素)。
到此這篇關(guān)于Kotlin Job啟動(dòng)流程源碼層深入分析的文章就介紹到這了,更多相關(guān)Kotlin Job啟動(dòng)流程內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MVVM和MVVMLight框架介紹及在項(xiàng)目中的使用詳解
這篇文章主要為大家介紹了MVVM和MVVMLight的介紹及在項(xiàng)目中的使用詳解有需要的朋友可以借鑒參考下,祝大家除夕快樂多多進(jìn)步2022-01-01Android開發(fā)仿掃一掃實(shí)現(xiàn)拍攝框內(nèi)的照片功能
無論是微信還是支付寶掃一掃功能很常用,那么它基于代碼是如何實(shí)現(xiàn)的呢?今天小編給大家分享android開發(fā)之仿掃一掃實(shí)現(xiàn)拍攝框內(nèi)的照片功能,感興趣的朋友一起學(xué)習(xí)吧2016-09-09Android開發(fā)兩個(gè)activity之間傳值示例詳解
這篇文章主要為大家介紹了Android開發(fā)兩個(gè)activity之間傳值示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07Android編程之內(nèi)存溢出解決方案(OOM)實(shí)例總結(jié)
這篇文章主要介紹了Android編程之內(nèi)存溢出解決方案(OOM),結(jié)合實(shí)例實(shí)例總結(jié)分析了Android編程過程中常見的內(nèi)存溢出情況與對(duì)應(yīng)的解決方法,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-11-11android計(jì)算pad或手機(jī)的分辨率/像素/密度/屏幕尺寸/DPI值的方法
本文將介紹手機(jī)布局/界面設(shè)計(jì)/分辨率/密度相關(guān),接下來介紹android計(jì)算pad或手機(jī)的分辨率像素等等的方法,感興趣的朋友可以了解下,希望本文可以幫助你2013-01-01Android Activity與Fragment實(shí)現(xiàn)底部導(dǎo)航器
這篇文章主要介紹了Android Activity與Fragment實(shí)現(xiàn)底部導(dǎo)航器的相關(guān)資料,并附實(shí)例代碼,需要的朋友可以參考下2016-11-11