Kotlin Job啟動流程源碼層深入分析
Job啟動流程
job啟動流程,我們先從一段最簡單使用協(xié)程的代碼開始,進(jìn)行代碼跟跟蹤,順便引出幾個關(guān)鍵的概念,在后面章節(jié)里面去單獨分析。代碼如下:
private fun testParentChildJob() {
val coroutineContext = Job() + CoroutineName("name1") + Dispatchers.IO + CoroutineExceptionHandler{ c,e -> println(e.message) }
val myScope = CoroutineScope(coroutineContext)
val job = myScope.launch {
println("myScope.launch :")
}
}首先創(chuàng)建一個有四種元素的上下文域myScope,由Job() + CoroutineName("name1") + Dispatchers.IO + CoroutineExceptionHandler{ c,e -> println(e.message) }組成,上一章coroutineContext篇已經(jīng)講過plus操作的過程了,不贅述。
接著用這個作用域myScope開啟一個協(xié)程,協(xié)程內(nèi)打印println("myScope.launch :")。
我自己從launch函數(shù)一步一步跟蹤后,得到了如下圖所示的流程:

launch流程分析
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
分為三步:
首先使用入?yún)?code>context: CoroutineContext = EmptyCoroutineContext,創(chuàng)建一個新的上下文集合newCoroutineContext(context),newCoroutineContext函數(shù)操作:就是根據(jù)所在的scope域的上下文集合和入?yún)⑦M(jìn)行組合操作,得到一個新的上下文集合,代碼如下:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
可以看到各種+操作,就是coroutineContext的各種plus操作,可以得到一個繼承自所在scope域的上下文集合(這個域由coroutineContext變量決定,這個變量屬于CoroutineScope成員),并且包含了入?yún)⒌腸ontext元素,這樣上下文集合就具有繼承性,并且自己還可以對已有元素進(jìn)行覆蓋。上一篇coroutineContext篇已經(jīng)講過,就不贅述了。
由于我們使用的默認(rèn)方式launch的,使用上面創(chuàng)建的newContext元素集合,就會創(chuàng)建一個StandaloneCoroutine(newContext, active = true)協(xié)程對象。這個對象繼承關(guān)系比較復(fù)雜,繼承關(guān)系如下:

這個類里面包含了很多成員變量,源碼如下:
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
//省略。。。
}
public abstract class AbstractCoroutine<in T>(
/**
* The context of the parent coroutine.
*/
@JvmField
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
/**
* The context of this coroutine that includes this coroutine as a [Job].
*/
@Suppress("LeakingThis")
public final override val context: CoroutineContext = parentContext + this
/**
* The context of this scope which is the same as the [context] of this coroutine.
*/
public override val coroutineContext: CoroutineContext get() = context
override val isActive: Boolean get() = super.isActive
}
context成員變量是外部傳進(jìn)來的newContext上下文集合 + this得到的,那么newContext的Job元素會被this替換掉;
coroutineContext成員變量是CoroutineScope接口的成員,覆寫為context對象; isActive標(biāo)志這個Job是否是存活狀態(tài); 調(diào)用剛剛創(chuàng)建的coroutine協(xié)程的start方法,coroutine.start(start, coroutine, block),跟進(jìn)去看看
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
initParentJob()方法主要是用于關(guān)聯(lián)父子Job的,這里先不講,對啟動流程沒啥影響。
start(block, receiver, this)是正真啟動協(xié)程的地方,CoroutineStart的值是DEFAULT
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(completion)
ATOMIC -> block.startCoroutine(completion)
UNDISPATCHED -> block.startCoroutineUndispatched(completion)
LAZY -> Unit // will start lazily
}
那么調(diào)用的就是DEFAULT -> block.startCoroutineCancellable(completion)這個分支,
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> {
//省略。。。
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}
private inline fun <T> createCoroutineFromSuspendFunction(
completion: Continuation<T>,
crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit> {
val context = completion.context
// label == 0 when coroutine is not started yet (initially) or label == 1 when it was
return if (context === EmptyCoroutineContext)
//省略。。。
else
object : ContinuationImpl(completion as Continuation<Any?>, context) {
private var label = 0
override fun invokeSuspend(result: Result<Any?>): Any? =
when (label) {
0 -> {
label = 1
result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
block(this) // run the block, may return or suspend
}
//省略。。。
}
}
}第一步:createCoroutineUnintercepted(completion)就是以completion作為參數(shù)創(chuàng)建一個ContinuationImpl對象,這個completion就是上面創(chuàng)建的StandaloneCoroutine對象。這個新的ContinuationImpl對象是繼承自Continuation,那么他就有fun resumeWith(result: Result<T>)方法,該方法是用于恢復(fù)掛起點,val context: CoroutineContext參數(shù),這個參數(shù)就是Continuation的所關(guān)聯(lián)的上下文集合。
我們再自己看看這個createCoroutineFromSuspendFunction這個方法,發(fā)現(xiàn)將我們launch{}的lambda參數(shù)進(jìn)行包裝后(this as Function1<Continuation<T>, Any?>).invoke(it)然后作為入?yún)lock,這個block作為ContinuationImpl對象覆寫的invokeSuspend函數(shù)的回調(diào)函數(shù)。那么可以從這個看出一個關(guān)系:
ContinuationImpl.invokeSuspend -> launch入?yún)⒌膌ambda函數(shù)體
第二步:就是調(diào)用ContinuationImpl .intercepted(),內(nèi)部處理是獲取ContinuationImpl的上下文集合中的ContinuationInterceptor元素,然后將ContinuationImpl作為參數(shù),包裝成DispatchedContinuation(this, continuation),其中this代表ContinuationInterceptor也就是dispatcher,continuation代表剛剛傳遞進(jìn)來的ContinuationImpl。
第三步:resumeCancellableWith(Result.success(Unit)),調(diào)用DispatchedContinuation的resumeCancellableWith函數(shù),代碼如下:
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result)
else -> resumeWith(result)
}
//DispatchedContinuation extends DIspatchedTask
inline fun resumeCancellableWith(result: Result<T>) {
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
省略。。。
}
}
//DIspatchedTask
public final override fun run() {
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
withCoroutineContext(context, delegate.countOrElement) {
//省略。。。
if (exception == null && job != null && !job.isActive) {
//省略。。。
} else {
if (exception != null) continuation.resumeWithException(exception)
else continuation.resume(getSuccessfulResult(state))
}
}
} catch (e: Throwable) {
//省略。。。
}
}由于DispatchedContinuation是繼承自DIspatchedTask的,所以DispatchedContinuation的run方法是DIspatchedTask已經(jīng)實現(xiàn)的了,所以dispatcher.dispatch(context, this),dispatcher調(diào)用的是DIspatchedTask.run方法,(dispatcher是一個線程池和java線程池類似,但是有一點區(qū)別,后面章節(jié)再講),run方法中,首先獲取delegate,然后取出continuation變量,這個delegate其實是被DispatchedContinuation覆寫的,而且實現(xiàn)的Continuation接口被構(gòu)造函數(shù)的continuation代理,這個入?yún)ontinuation其實就是ContinuationImpl,上一步分析過了。
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
//省略。。。
override val delegate: Continuation<T>
get() = this
//省略。。。
}
//Continuation
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))那么其實就是調(diào)用的ContinuationImpl.resumeWith(Result.success(value))方法,ContinuationImpl繼承自BaseContinuationImpl,繼續(xù)進(jìn)去看看
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var param = result
while (true) {
//省略。。。
with(current) {
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
//省略。。。
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
//省略。。。
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}第一步,調(diào)用val outcome = invokeSuspend(param),上面已經(jīng)分析了,invokeSuspend被ContinuationImpl覆寫了,內(nèi)部回調(diào)了launch的lambda表達(dá)式;
第二步,調(diào)用completion.resumeWith(outcome),這個completion上面分析了,是StandAloneCoroutine協(xié)程,調(diào)用了StandAloneCoroutine對象的resumeWith方法,這個方法里面用于更新協(xié)程狀態(tài),比如協(xié)程成功,失敗之類的。
綜上,通過上面的invokeSuspend函數(shù)調(diào)用,最終調(diào)用到了launch的lambda表達(dá)式,也就是我們業(yè)務(wù)代碼,我們的業(yè)務(wù)代碼是被封裝到了ContinuationImpl類中。
通過上面的分析,一共發(fā)現(xiàn)了三種不同類型的continuation,它們分別是:
DispatchedContinuation用于分發(fā)continuation到指定的線程池中; ContinuationImpl用于包裝launch的lambda代碼塊作為業(yè)務(wù)代碼代理類; StandAloneCoroutine協(xié)程管理類管理Job生命周期以及協(xié)程的狀態(tài)父子Job關(guān)系維護(hù)等等。
它們的調(diào)用鏈如下:

父子Job關(guān)聯(lián)分析
父子Job關(guān)聯(lián)操作是在上面launch流程中的,在調(diào)用start方法的時候進(jìn)行關(guān)聯(lián)的:

initParentJob方法里面,先調(diào)用parent.start方法,確保parent的Job已經(jīng)啟動了,接著調(diào)用parent.attachChild(this)方法,用于關(guān)聯(lián)父子Job。 代碼如下:
//AbstractCoroutine
internal fun initParentJob() {
//取出上下文集合中的Job元素,調(diào)用initParentJobInternal方法
initParentJobInternal(parentContext[Job])
}
//AbstractCoroutine
internal fun initParentJobInternal(parent: Job?) {
//省略。。。
parent.start() // make sure the parent is started
//省略。。。
val handle = parent.attachChild(this)
parentHandle = handle
//省略。。。
if (isCompleted) {
handle.dispose()
}
}首先取出parentContext[Job]的Job元素,這個parentContext就是launch的時候根據(jù)scope的上下文集合創(chuàng)建出來的上下文集合,取出的Job元素就是父Job,作為initParentJobInternal的參數(shù),接著調(diào)用parent.attachChild(this):
//JobSupport
public final override fun attachChild(child: ChildJob): ChildHandle {
/*
* Note: This function attaches a special ChildHandleNode node object. This node object
* is handled in a special way on completion on the coroutine (we wait for all of them) and
* is handled specially by invokeOnCompletion itself -- it adds this node to the list even
* if the job is already cancelling. For cancelling state child is attached under state lock.
* It's required to properly wait all children before completion and provide linearizable hierarchy view:
* If child is attached when the job is already being cancelled, such child will receive immediate notification on
* cancellation, but parent *will* wait for that child before completion and will handle its exception.
*/
return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
}
//JobSupport
internal class ChildHandleNode(
parent: JobSupport,
@JvmField val childJob: ChildJob
) : JobCancellingNode<JobSupport>(parent), ChildHandle {
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
override fun toString(): String = "ChildHandle[$childJob]"
}首先創(chuàng)建了一個handler = ChildHandleNode(this, child).asHandler對象,這個對象ChildHandleNode作為參數(shù)傳遞給invokeOnCompletion,然后返回一個ChildHandle類型的對象,賦值給子Job的parentHandle val handle = parent.attachChild(this); parentHandle = handle,parentHandle 這個是子Job持有的變量,ChildHandle接口擁有childCancelled方法,用于子Job通知父Job,子Job已經(jīng)取消了,父Job需要根據(jù)子Job狀態(tài)繼續(xù)進(jìn)行處理。
//JobSupport
public final override fun invokeOnCompletion(
onCancelling: Boolean,
invokeImmediately: Boolean,
handler: CompletionHandler
): DisposableHandle {
var nodeCache: JobNode<*>? = null
loopOnState { state ->
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) {
// try move to SINGLE state
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (_state.compareAndSet(state, node)) return node
}
//省略。。。
}
is Incomplete -> {
val list = state.list
if (list == null) { // SINGLE/SINGLE+
promoteSingleToNodeList(state as JobNode<*>)
} else {
var rootCause: Throwable? = null
var handle: DisposableHandle = NonDisposableHandle
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (!addLastAtomic(state, list, node)) return@loopOnState // retry
if (rootCause == null) return node
//省略。。。
if (rootCause != null) {
//省略。。。
} else {
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (addLastAtomic(state, list, node)) return node
}
}
}
else -> { // is complete
//省略。。。
}
}
}
}
invokeOnCompletion方法就是,將傳遞進(jìn)來的handler: CompletionHandler,分情況存儲起來,
當(dāng)state狀態(tài)是Empty狀態(tài),創(chuàng)建一個代理節(jié)點node ,之后存入到state中; 當(dāng)state是Incomplete狀態(tài),如果state.list結(jié)構(gòu)是空的,那么創(chuàng)建一個鏈表,將node 節(jié)點作為第一個節(jié)點存進(jìn)去,當(dāng)前state.list不為空,那么將node節(jié)點插入到鏈表的末尾。 這樣經(jīng)過上面這兩步: 子Job持有的parentHandle對象可以通知父Job自己已經(jīng)取消了:
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
父Job持有的state對象保存著包裝著子Job的ChildHandleNode對象,父Job通過遍歷調(diào)用列表中的node元素的invoke方法,即可取消所有的子Job:
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
會發(fā)現(xiàn), 調(diào)用launch生成一個Job,這個Job就會initParentJob() ,進(jìn)而子Job會持有父Job,父Job也會將子Job加入到state的數(shù)據(jù)結(jié)構(gòu)中,進(jìn)而形成了樹的結(jié)構(gòu),類似于下圖:

父子Job都可以互相通知對方自己已經(jīng)取消,需要做出對應(yīng)的處理。
結(jié)論
launch啟動一個協(xié)程,會生成三個continuation,分別是
DispatchedContinuation用于分發(fā)continuation到指定的線程池中; ContinuationImpl用于包裝launch的lambda代碼塊作為業(yè)務(wù)代碼代理類; StandAloneCoroutine協(xié)程管理類管理Job生命周期以及協(xié)程的狀態(tài)父子Job關(guān)系維護(hù)等等。 調(diào)用鏈:DispatchedContinuation -> ContinuationImpl(在這里調(diào)用launch的lambda業(yè)務(wù)代碼塊) -> StandAloneCoroutine
launch啟動一個協(xié)程Job,這個Job所在域如果存在parentJob ,那么parentJob和Job會形成樹結(jié)構(gòu)上的父子節(jié)點,并且子Job繼承了父Job的CoroutineScope的上下文集合(根據(jù)參數(shù)會覆蓋一些重復(fù)Key的元素)。
到此這篇關(guān)于Kotlin Job啟動流程源碼層深入分析的文章就介紹到這了,更多相關(guān)Kotlin Job啟動流程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Android開發(fā)仿掃一掃實現(xiàn)拍攝框內(nèi)的照片功能
無論是微信還是支付寶掃一掃功能很常用,那么它基于代碼是如何實現(xiàn)的呢?今天小編給大家分享android開發(fā)之仿掃一掃實現(xiàn)拍攝框內(nèi)的照片功能,感興趣的朋友一起學(xué)習(xí)吧2016-09-09
Android開發(fā)兩個activity之間傳值示例詳解
這篇文章主要為大家介紹了Android開發(fā)兩個activity之間傳值示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07
Android編程之內(nèi)存溢出解決方案(OOM)實例總結(jié)
這篇文章主要介紹了Android編程之內(nèi)存溢出解決方案(OOM),結(jié)合實例實例總結(jié)分析了Android編程過程中常見的內(nèi)存溢出情況與對應(yīng)的解決方法,具有一定參考借鑒價值,需要的朋友可以參考下2015-11-11
android計算pad或手機(jī)的分辨率/像素/密度/屏幕尺寸/DPI值的方法
本文將介紹手機(jī)布局/界面設(shè)計/分辨率/密度相關(guān),接下來介紹android計算pad或手機(jī)的分辨率像素等等的方法,感興趣的朋友可以了解下,希望本文可以幫助你2013-01-01
Android Activity與Fragment實現(xiàn)底部導(dǎo)航器
這篇文章主要介紹了Android Activity與Fragment實現(xiàn)底部導(dǎo)航器的相關(guān)資料,并附實例代碼,需要的朋友可以參考下2016-11-11

