Android?Dispatchers.IO線程池深入刨析
一. Dispatchers.IO
1.Dispatchers.IO
在協(xié)程中,當(dāng)需要執(zhí)行IO任務(wù)時(shí),會(huì)在上下文中指定Dispatchers.IO來(lái)進(jìn)行線程的切換調(diào)度。 而IO實(shí)際上是CoroutineDispatcher類型的對(duì)象,實(shí)際的值為DefaultScheduler類的常量對(duì)象IO,代碼如下:
public actual object Dispatchers {
...
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
2.DefaultScheduler類
DefaultScheduler類繼承自ExperimentalCoroutineDispatcher類,內(nèi)部提供了類型為L(zhǎng)imitingDispatcher的IO對(duì)象,代碼如下:
// 系統(tǒng)配置變量
public const val IO_PARALLELISM_PROPERTY_NAME: String = "kotlinx.coroutines.io.parallelism"
...
// 表示不會(huì)阻塞的任務(wù),純CPU任務(wù)
internal const val TASK_NON_BLOCKING = 0
// 表示執(zhí)行過(guò)程中可能會(huì)阻塞的任務(wù),非純CPU任務(wù)
internal const val TASK_PROBABLY_BLOCKING = 1
...
// 默認(rèn)線程池名稱
internal const val DEFAULT_DISPATCHER_NAME = "Dispatchers.Default"
...
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
// 創(chuàng)建名為Dispatchers.IO的線程池
// 最大并發(fā)數(shù)量為kotlinx.coroutines.io.parallelism指定的值,默認(rèn)為64與CPU數(shù)量中的較大者
// 默認(rèn)的執(zhí)行的任務(wù)類型為TASK_PROBABLY_BLOCKING
val IO: CoroutineDispatcher = LimitingDispatcher(
this,
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
"Dispatchers.IO",
TASK_PROBABLY_BLOCKING
)
override fun close() {
throw UnsupportedOperationException("$DEFAULT_DISPATCHER_NAME cannot be closed")
}
// 可以看出IO和Default共用一個(gè)線程池
override fun toString(): String = DEFAULT_DISPATCHER_NAME
@InternalCoroutinesApi
@Suppress("UNUSED")
public fun toDebugString(): String = super.toString()
}3.LimitingDispatcher類
LimitingDispatcher類繼承自ExecutorCoroutineDispatcher類,實(shí)現(xiàn)了TaskContext接口和Executor接口。
LimitingDispatcher類的核心是構(gòu)造方法中類型為ExperimentalCoroutineDispatcher的dispatcher對(duì)象。
LimitingDispatcher類看起來(lái)是一個(gè)標(biāo)準(zhǔn)的線程池,但實(shí)際上LimitingDispatcher類只對(duì)類參數(shù)中傳入的dispatcher進(jìn)行包裝和功能擴(kuò)展。如同名字中的litmit一樣,LimitingDispatcher類主要用于對(duì)任務(wù)執(zhí)行數(shù)量進(jìn)行限制,代碼如下:
// dispatcher參數(shù)傳入了DefaultScheduler對(duì)象
// parallelism表示并發(fā)執(zhí)行的任務(wù)數(shù)量
// name表示線程池的名字
// taskMode表示任務(wù)模式,TaskContext接口中的常量
private class LimitingDispatcher(
private val dispatcher: ExperimentalCoroutineDispatcher,
private val parallelism: Int,
private val name: String?,
override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
// 用于保存任務(wù)的隊(duì)列
private val queue = ConcurrentLinkedQueue<Runnable>()
// 用于記錄當(dāng)前正在執(zhí)行的任務(wù)的數(shù)量
private val inFlightTasks = atomic(0)
// 獲取當(dāng)前線程池
override val executor: Executor
get() = this
// Executor接口的實(shí)現(xiàn),線程池的核心方法,通過(guò)dispatch實(shí)現(xiàn)
override fun execute(command: Runnable) = dispatch(command, false)
override fun close(): Unit = error("Close cannot be invoked on LimitingBlockingDispatcher")
// CoroutineDispatcher接口的實(shí)現(xiàn)
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
// 任務(wù)分發(fā)的核心方法
private fun dispatch(block: Runnable, tailDispatch: Boolean) {
// 獲取當(dāng)前要執(zhí)行的任務(wù)
var taskToSchedule = block
// 死循環(huán)
while (true) {
// 當(dāng)前執(zhí)行的任務(wù)數(shù)加一,也可理解生成生成當(dāng)前要執(zhí)行的任務(wù)的編號(hào)
val inFlight = inFlightTasks.incrementAndGet()
// 如果當(dāng)前需要執(zhí)行的任務(wù)數(shù)小于允許的并發(fā)執(zhí)行任務(wù)數(shù)量,說(shuō)明可以執(zhí)行,
if (inFlight <= parallelism) {
// 調(diào)用參數(shù)中的dispatcher對(duì)象,執(zhí)行任務(wù)
dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
// 返回,退出循環(huán)
return
}
// 如果達(dá)到的最大并發(fā)數(shù)的限制,則將任務(wù)加入到隊(duì)列中
queue.add(taskToSchedule)
// 下面的代碼防止線程競(jìng)爭(zhēng)導(dǎo)致任務(wù)卡在隊(duì)列里不被執(zhí)行,case如下:
// 線程1:inFlightTasks = 1 ,執(zhí)行任務(wù)
// 線程2:inFlightTasks = 2,當(dāng)前達(dá)到了parallelism限制,
// 線程1:執(zhí)行結(jié)束,inFlightTasks = 1
// 線程2:將任務(wù)添加到隊(duì)列里,執(zhí)行結(jié)束,inFlightTasks = 0
// 由于未執(zhí)行,因此這里當(dāng)前執(zhí)行的任務(wù)數(shù)先減一
// 減一后如果仍然大于等于在大并發(fā)數(shù),則直接返回,退出循環(huán)
if (inFlightTasks.decrementAndGet() >= parallelism) {
return
}
// 如果減一后,發(fā)現(xiàn)可以執(zhí)行任務(wù),則從隊(duì)首獲取任務(wù),進(jìn)行下一次循環(huán)
// 如果隊(duì)列為空,說(shuō)明沒(méi)有任務(wù),則返回,退出循環(huán)
taskToSchedule = queue.poll() ?: return
}
}
// CoroutineDispatcher接口的實(shí)現(xiàn),用于yield掛起協(xié)程時(shí)的調(diào)度處理
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
// 也是通過(guò)dispatch方法實(shí)現(xiàn),注意這里tailDispatch參數(shù)為true
dispatch(block, tailDispatch = true)
}
override fun toString(): String {
return name ?: "${super.toString()}[dispatcher = $dispatcher]"
}
// TaskContext接口的實(shí)現(xiàn),用于在一個(gè)任務(wù)執(zhí)行完進(jìn)行回調(diào)
override fun afterTask() {
// 從隊(duì)首獲取一個(gè)任務(wù)
var next = queue.poll()
// 若可以獲取到
if (next != null) {
// 則執(zhí)行任務(wù),注意這里tailDispatch參數(shù)為true
dispatcher.dispatchWithContext(next, this, true)
// 返回
return
}
// 任務(wù)執(zhí)行完畢,當(dāng)前執(zhí)行的任務(wù)數(shù)量減一
inFlightTasks.decrementAndGet()
// 下面的代碼防止線程競(jìng)爭(zhēng)導(dǎo)致任務(wù)卡在隊(duì)列里不被執(zhí)行,case如下:
// 線程1:inFlightTasks = 1 ,執(zhí)行任務(wù)
// 線程2:inFlightTasks = 2
// 線程1:執(zhí)行結(jié)束,執(zhí)行afterTask方法,發(fā)現(xiàn)隊(duì)列為空,此時(shí)inFlightTasks = 2
// 線程2:inFlightTasks當(dāng)前達(dá)到了parallelism限制,
// 將任務(wù)加入到隊(duì)列中,執(zhí)行結(jié)束,inFlightTasks = 1
// 線程1:inFlightTasks=1,執(zhí)行結(jié)束
// 從隊(duì)列中取出任務(wù),隊(duì)列為空則返回
next = queue.poll() ?: return
// 執(zhí)行任務(wù),注意這里tailDispatch參數(shù)為true
dispatch(next, true)
}
}dispatcher的dispatch方法定義在ExperimentalCoroutineDispatcher類中。
4.ExperimentalCoroutineDispatcher類
ExperimentalCoroutineDispatcher類繼承自ExecutorCoroutineDispatcher類,代碼如下:
// corePoolSize線程池核心線程數(shù)
// maxPoolSize表示線程池最大線程數(shù)
// schedulerName表示內(nèi)部協(xié)程調(diào)度器的名字
// idleWorkerKeepAliveNs表示空閑的線程存活時(shí)間
@InternalCoroutinesApi
public open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long,
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
// 我們?cè)贒efaultScheduler類中就是通過(guò)默認(rèn)的構(gòu)造方法,
// 創(chuàng)建的父類ExperimentalCoroutineDispatcher對(duì)象
public constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
...
// 創(chuàng)建coroutineScheduler對(duì)象
private var coroutineScheduler = createScheduler()
// 核心的分發(fā)方法
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
// 調(diào)用coroutineScheduler對(duì)象的dispatch方法
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
// 只有當(dāng)coroutineScheduler正在關(guān)閉時(shí),才會(huì)拒絕執(zhí)行,拋出異常
DefaultExecutor.dispatch(context, block)
}
...
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
...
}
// 核心線程數(shù)
@JvmField
internal val CORE_POOL_SIZE = systemProp(
"kotlinx.coroutines.scheduler.core.pool.size",
AVAILABLE_PROCESSORS.coerceAtLeast(2), // !!! at least two here
minValue = CoroutineScheduler.MIN_SUPPORTED_POOL_SIZE
)
// 最大線程數(shù)
@JvmField
internal val MAX_POOL_SIZE = systemProp(
"kotlinx.coroutines.scheduler.max.pool.size",
(AVAILABLE_PROCESSORS * 128).coerceIn(
CORE_POOL_SIZE,
CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE
),
maxValue = CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE
)
// 空閑線程的存活時(shí)間
@JvmField
internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(
systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 60L)
)在ExperimentalCoroutineDispatcher類的dispatch方法內(nèi)部,通過(guò)調(diào)用類型為CoroutineScheduler的對(duì)象的dispatch方法實(shí)現(xiàn)。
二.CoroutineScheduler類
1.CoroutineScheduler類的繼承關(guān)系
在對(duì)CoroutineScheduler類的dispatch方法分析之前,首先分析一下CoroutineScheduler類的繼承關(guān)系,代碼如下:
// 實(shí)現(xiàn)了Executor和Closeable接口
// corePoolSize線程池核心線程數(shù)
// maxPoolSize表示線程池最大線程數(shù)
// schedulerName表示內(nèi)部協(xié)程調(diào)度器的名字
// idleWorkerKeepAliveNs表示空閑的線程存活時(shí)間
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
init {
// 核心線程數(shù)量必須大于等于MIN_SUPPORTED_POOL_SIZE
require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) {
"Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE"
}
// 最大線程數(shù)量必須大于等于核心線程數(shù)量
require(maxPoolSize >= corePoolSize) {
"Max pool size $maxPoolSize should be greater than or equals to core pool size $corePoolSize"
}
// 最大線程數(shù)量必須小于等于MAX_SUPPORTED_POOL_SIZE
require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE) {
"Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
}
// 空閑的線程存活時(shí)間必須大于0
require(idleWorkerKeepAliveNs > 0) {
"Idle worker keep alive time $idleWorkerKeepAliveNs must be positive"
}
}
...
// Executor接口中的實(shí)現(xiàn),通過(guò)dispatch方法實(shí)現(xiàn)
override fun execute(command: Runnable) = dispatch(command)
// Closeable接口中的實(shí)現(xiàn),通過(guò)shutdown方法實(shí)現(xiàn)
override fun close() = shutdown(10_000L)
...
}2.CoroutineScheduler類的全局變量
接下來(lái)對(duì)CoroutineScheduler類中重要的全局變量進(jìn)行分析,代碼如下:
// 用于存儲(chǔ)全局的純CPU(不阻塞)任務(wù)
@JvmField
val globalCpuQueue = GlobalQueue()
// 用于存儲(chǔ)全局的執(zhí)行非純CPU(可能阻塞)任務(wù)
@JvmField
val globalBlockingQueue = GlobalQueue()
...
// 用于記錄當(dāng)前處于Parked狀態(tài)(一段時(shí)間后自動(dòng)終止)的線程的數(shù)量
private val parkedWorkersStack = atomic(0L)
...
// 用于保存當(dāng)前線程池中的線程
// workers[0]永遠(yuǎn)為null,作為哨兵位
// index從1到maxPoolSize為有效線程
@JvmField
val workers = AtomicReferenceArray<Worker?>(maxPoolSize + 1)
...
// 控制狀態(tài)
private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
// 表示已經(jīng)創(chuàng)建的線程的數(shù)量
private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
// 表示可以獲取的CPU令牌數(shù)量,初始值為線程池核心線程數(shù)量
private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)
// 獲取指定的狀態(tài)的已經(jīng)創(chuàng)建的線程的數(shù)量
private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
// 獲取指定的狀態(tài)的執(zhí)行阻塞任務(wù)的數(shù)量
private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
// 獲取指定的狀態(tài)的CPU令牌數(shù)量
public inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()
// 當(dāng)前已經(jīng)創(chuàng)建的線程數(shù)量加1
private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet())
// 當(dāng)前已經(jīng)創(chuàng)建的線程數(shù)量減1
private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement())
// 當(dāng)前執(zhí)行阻塞任務(wù)的線程數(shù)量加1
private inline fun incrementBlockingTasks() = controlState.addAndGet(1L shl BLOCKING_SHIFT)
// 當(dāng)前執(zhí)行阻塞任務(wù)的線程數(shù)量減1
private inline fun decrementBlockingTasks() {
controlState.addAndGet(-(1L shl BLOCKING_SHIFT))
}
// 嘗試獲取CPU令牌
private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state ->
val available = availableCpuPermits(state)
if (available == 0) return false
val update = state - (1L shl CPU_PERMITS_SHIFT)
if (controlState.compareAndSet(state, update)) return true
}
// 釋放CPU令牌
private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT)
// 表示當(dāng)前線程池是否關(guān)閉
private val _isTerminated = atomic(false)
val isTerminated: Boolean get() = _isTerminated.value
companion object {
// 用于標(biāo)記一個(gè)線程是否在parkedWorkersStack中(處于Parked狀態(tài))
@JvmField
val NOT_IN_STACK = Symbol("NOT_IN_STACK")
// 線程的三個(gè)狀態(tài)
// CLAIMED表示線程可以執(zhí)行任務(wù)
// PARKED表示線程暫停執(zhí)行任務(wù),一段時(shí)間后會(huì)自動(dòng)進(jìn)入終止?fàn)顟B(tài)
// TERMINATED表示線程處于終止?fàn)顟B(tài)
private const val PARKED = -1
private const val CLAIMED = 0
private const val TERMINATED = 1
// 以下五個(gè)常量為掩碼
private const val BLOCKING_SHIFT = 21 // 2x1024x1024
// 1-21位
private const val CREATED_MASK: Long = (1L shl BLOCKING_SHIFT) - 1
// 22-42位
private const val BLOCKING_MASK: Long = CREATED_MASK shl BLOCKING_SHIFT
// 42
private const val CPU_PERMITS_SHIFT = BLOCKING_SHIFT * 2
// 43-63位
private const val CPU_PERMITS_MASK = CREATED_MASK shl CPU_PERMITS_SHIFT
// 以下兩個(gè)常量用于require中參數(shù)判斷
internal const val MIN_SUPPORTED_POOL_SIZE = 1
// 2x1024x1024-2
internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2
// parkedWorkersStack的掩碼
private const val PARKED_INDEX_MASK = CREATED_MASK
// inv表示01反轉(zhuǎn)
private const val PARKED_VERSION_MASK = CREATED_MASK.inv()
private const val PARKED_VERSION_INC = 1L shl BLOCKING_SHIFT
}CoroutineScheduler類中對(duì)線程的狀態(tài)與權(quán)限控制:

availableCpuPermits的初始值為參數(shù)中核心線程數(shù)corePoolSize的值,表示CoroutineScheduler類中最多只有corePoolSize個(gè)核心線程。執(zhí)行純CPU任務(wù)的線程每次執(zhí)行任務(wù)之前需要在availableCpuPermits中進(jìn)行記錄與申請(qǐng)。blockingTasks表示執(zhí)行非純CPU任務(wù)的數(shù)量。這部分線程在執(zhí)行時(shí)不需要CPU令牌。createdWorkers表示當(dāng)前線程池中所有線程的數(shù)量,每個(gè)線程在創(chuàng)建或終止時(shí)都需要通過(guò)在這里進(jìn)行記錄。這些變量的具體關(guān)系如下:
createdWorkers = blockingTasks + corePoolSize - availableCpuPermits
CPU令牌是線程池自定義的概念,不代表時(shí)間片,只是為了保證核心線程的數(shù)量。
三.Worker類與WorkerState類
在分析CoroutineScheduler類的dispatch方法之前,還需要分析一下CoroutineScheduler類中的兩個(gè)重要的內(nèi)部類Worker類以及其對(duì)應(yīng)的狀態(tài)類WorkerState類。
Worker是一個(gè)線程池中任務(wù)的核心執(zhí)行者,幾乎在所有的線程池中都存在Worker的概念。
1.WorkerState類
首先分析一下WorkerState類,代碼如下:
// 一個(gè)枚舉類,表示W(wǎng)orker的狀態(tài)
enum class WorkerState {
// 擁有了CPU令牌,可以執(zhí)行純CPU任務(wù),也可以執(zhí)行非純CPU任務(wù)
CPU_ACQUIRED,
// 可以執(zhí)行非純CPU任務(wù)
BLOCKING,
// 當(dāng)前已經(jīng)暫停,一段時(shí)間后將終止,也有可能被再次使用
PARKING,
// 休眠狀態(tài),用于初始狀態(tài),只能執(zhí)行自己本地任務(wù)
DORMANT,
// 終止?fàn)顟B(tài),將不再被使用
TERMINATED
}2.Worker類的繼承關(guān)系與全局變量
接下來(lái)對(duì)Worker類的繼承關(guān)系以及其中重要的全局變量進(jìn)行分析,代碼如下:
// 繼承自Thread類
// 私有化無(wú)參的構(gòu)造方法
internal inner class Worker private constructor() : Thread() {
init {
// 標(biāo)記為守護(hù)線程
isDaemon = true
}
// 當(dāng)前線程在存儲(chǔ)線程池線程的數(shù)組workers中的索引位置
@Volatile
var indexInArray = 0
set(index) {
// 設(shè)置線程名
name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}"
field = index
}
// 構(gòu)造方法
constructor(index: Int) : this() {
indexInArray = index
}
// 獲取當(dāng)前線程的調(diào)度器
inline val scheduler get() = this@CoroutineScheduler
// 線程存儲(chǔ)任務(wù)的本地隊(duì)列
@JvmField
val localQueue: WorkQueue = WorkQueue()
// 線程的狀態(tài) (內(nèi)部轉(zhuǎn)換)
@JvmField
var state = WorkerState.DORMANT
// 線程的控制狀態(tài)(外部賦予)
val workerCtl = atomic(CLAIMED)
// 終止截止時(shí)間,表示處于PARKING狀態(tài)的線程,在terminationDeadline毫秒后終止
private var terminationDeadline = 0L
// 表示當(dāng)線程處于PARKING狀態(tài),進(jìn)入parkedWorkersStack后,
// 下一個(gè)處于PARKING狀態(tài)并進(jìn)入parkedWorkersStack的線程的引用
@Volatile
var nextParkedWorker: Any? = NOT_IN_STACK
// 偷取其他線程的本地隊(duì)列的任務(wù)的冷卻時(shí)間,后面會(huì)解釋
private var minDelayUntilStealableTaskNs = 0L
// 生成隨機(jī)數(shù),配合算法,用于任務(wù)尋找
private var rngState = Random.nextInt()
...
// 表示當(dāng)前線程的本地隊(duì)列是否有任務(wù)
@JvmField
var mayHaveLocalTasks = false
...
}3.Worker類的run方法
接下來(lái)分析Worker類的核心方法——run方法的實(shí)現(xiàn),代碼入下:
override fun run() = runWorker()
private fun runWorker() {
// 用于配合minDelayUntilStealableTaskNs自旋
var rescanned = false
// 線程池未關(guān)閉,線程沒(méi)有終止,則循環(huán)
while (!isTerminated && state != WorkerState.TERMINATED) {
// 尋找并獲取任務(wù)
val task = findTask(mayHaveLocalTasks)
// 如果找到了任務(wù)
if (task != null) {
// 重制兩個(gè)變量
rescanned = false
minDelayUntilStealableTaskNs = 0L
// 執(zhí)行任務(wù)
executeTask(task)
// 繼續(xù)循環(huán)
continue
} else { // 如果沒(méi)有找到任務(wù),說(shuō)明本地隊(duì)列肯定沒(méi)有任務(wù),因?yàn)楸镜仃?duì)列優(yōu)先查找
// 設(shè)置標(biāo)志位
mayHaveLocalTasks = false
}
// 走到這里,說(shuō)明沒(méi)有找到任務(wù)
// 如果偷取任務(wù)的冷卻時(shí)間不為0,說(shuō)明之前偷到過(guò)任務(wù)
if (minDelayUntilStealableTaskNs != 0L) {
// 這里通過(guò)rescanned,首次minDelayUntilStealableTaskNs不為0,
// 不會(huì)立刻進(jìn)入PARKING狀態(tài),而是再次去尋找任務(wù)
// 因?yàn)楫?dāng)過(guò)多的線程進(jìn)入PARKING狀態(tài),再次喚起大量的線程很難控制
if (!rescanned) {
rescanned = true
} else {// 再次掃描,仍然沒(méi)有找到任務(wù)
// 置位
rescanned = false
// 嘗試釋放CPU令牌,并進(jìn)入WorkerState.PARKING狀態(tài)
tryReleaseCpu(WorkerState.PARKING)
// 清除中斷標(biāo)志位
interrupted()
// 阻塞minDelayUntilStealableTaskNs毫秒
LockSupport.parkNanos(minDelayUntilStealableTaskNs)
// 清零
minDelayUntilStealableTaskNs = 0L
}
// 阻塞完成后繼續(xù)執(zhí)行
continue
}
// 走到這里,說(shuō)明線程可能很長(zhǎng)時(shí)間都沒(méi)有執(zhí)行任務(wù)了,則對(duì)其進(jìn)行暫停處理
// tryPark比tryReleaseCpu要嚴(yán)格的多,會(huì)被線程會(huì)被計(jì)入到parkedWorkersStack,
// 同時(shí)會(huì)修改workerCtl狀態(tài)
tryPark()
}
// 退出循環(huán)
// 嘗試釋放CPU令牌,并進(jìn)入終止?fàn)顟B(tài)
tryReleaseCpu(WorkerState.TERMINATED)
}4.Worker類的任務(wù)尋找機(jī)制
接下來(lái)分析Worker線程如何尋找任務(wù),代碼如下:
// 尋找任務(wù)
fun findTask(scanLocalQueue: Boolean): Task? {
// 嘗試獲取CPU令牌,如果獲取到了,則調(diào)用findAnyTask方法,尋找任務(wù)
if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
// 如果沒(méi)有獲取到CPU令牌,只能去找非純CPU任務(wù)了
// 如果允許掃描本地的任務(wù)隊(duì)列,則優(yōu)先在本地隊(duì)列中尋找,
// 找不到則在全局隊(duì)列中尋找,從隊(duì)首中獲取
val task = if (scanLocalQueue) {
localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
} else {
globalBlockingQueue.removeFirstOrNull()
}
// 如果在本地隊(duì)列和全局隊(duì)列中都找不到,則嘗試去其他線程的隊(duì)列里偷一個(gè)任務(wù)
return task ?: trySteal(blockingOnly = true)
}
// 尋找CPU任務(wù)
private fun findAnyTask(scanLocalQueue: Boolean): Task? {
// 如果允許掃描本地的任務(wù)隊(duì)列,則在本地隊(duì)列和全局隊(duì)列中隨機(jī)二選一,
// 找不到則在全局隊(duì)列中尋找,從隊(duì)首中獲取
if (scanLocalQueue) {
// 隨機(jī)確定本地隊(duì)列和全局隊(duì)列的優(yōu)先順序
val globalFirst = nextInt(2 * corePoolSize) == 0
// 獲取任務(wù)
if (globalFirst) pollGlobalQueues()?.let { return it }
localQueue.poll()?.let { return it }
if (!globalFirst) pollGlobalQueues()?.let { return it }
} else {
// 只能從全局獲取
pollGlobalQueues()?.let { return it }
}
// 走到這里,說(shuō)明本地隊(duì)列和全局隊(duì)列中都找不到
// 那么就嘗試去其他線程的隊(duì)列里偷一個(gè)任務(wù)
return trySteal(blockingOnly = false)
}
// 從全局隊(duì)列獲取任務(wù)
private fun pollGlobalQueues(): Task? {
// 隨機(jī)獲取CPU任務(wù)或者非CPU任務(wù)
if (nextInt(2) == 0) {
// 優(yōu)先獲取CPU任務(wù)
globalCpuQueue.removeFirstOrNull()?.let { return it }
return globalBlockingQueue.removeFirstOrNull()
} else {
// 優(yōu)先獲取非CPU任務(wù)
globalBlockingQueue.removeFirstOrNull()?.let { return it }
return globalCpuQueue.removeFirstOrNull()
}
}
// 偷取其他線程的本地隊(duì)列的任務(wù)
// blockingOnly表示是否只偷取阻塞任務(wù)
private fun trySteal(blockingOnly: Boolean): Task? {
// 只有當(dāng)前線程的本地隊(duì)列為空的時(shí)候,才能偷其他線程的本地隊(duì)列
assert { localQueue.size == 0 }
// 獲取已經(jīng)存在的線程的數(shù)量
val created = createdWorkers
// 如果線程總數(shù)為0或1,則不偷取,直接返回
// 0:需要等待初始化
// 1:避免在單線程機(jī)器上過(guò)度偷取
if (created < 2) {
return null
}
// 隨機(jī)生成一個(gè)存在的線程索引
var currentIndex = nextInt(created)
// 默認(rèn)的偷取冷卻時(shí)間
var minDelay = Long.MAX_VALUE
// 循環(huán)遍歷
repeat(created) {
// 每次循環(huán)索引自增,帶著下一行代碼表示,從位置currentIndex開始偷
++currentIndex
// 如果超出了,則從頭繼續(xù)
if (currentIndex > created) currentIndex = 1
// 從數(shù)組中獲取線程
val worker = workers[currentIndex]
// 如果線程不為空,并且不是自己
if (worker !== null && worker !== this) {
assert { localQueue.size == 0 }
// 根據(jù)偷取的類型進(jìn)行偷取
val stealResult = if (blockingOnly) {
// 偷取非CPU任務(wù)到本地隊(duì)列中
localQueue.tryStealBlockingFrom(victim = worker.localQueue)
} else {
// 偷取任務(wù)到本地隊(duì)列中
localQueue.tryStealFrom(victim = worker.localQueue)
}
// 如果返回值為TASK_STOLEN,說(shuō)明偷到了
// 如果返回值為NOTHING_TO_STEAL,說(shuō)明要偷的線程的本地隊(duì)列是空的
if (stealResult == TASK_STOLEN) {
// 從隊(duì)列的隊(duì)首拿出來(lái)返回
return localQueue.poll()
// 如果返回值大于零,表示偷取的冷卻時(shí)間,說(shuō)明沒(méi)有偷到
} else if (stealResult > 0) { // 說(shuō)明至少還要等待stealResult時(shí)間才能偷取這個(gè)任務(wù)
// 計(jì)算偷取冷卻時(shí)間
minDelay = min(minDelay, stealResult)
}
}
}
// 設(shè)置偷取等待時(shí)間
minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0
// 返回空
return null
}
// 基于Marsaglia xorshift RNG算法
// 用于在2^32-1范圍內(nèi)計(jì)算偷取目標(biāo)
internal fun nextInt(upperBound: Int): Int {
var r = rngState
r = r xor (r shl 13)
r = r xor (r shr 17)
r = r xor (r shl 5)
rngState = r
val mask = upperBound - 1
// Fast path for power of two bound
if (mask and upperBound == 0) {
return r and mask
}
return (r and Int.MAX_VALUE) % upperBound
}通過(guò)對(duì)這部分代碼的分析,可以知道線程在尋找任務(wù)時(shí),首先會(huì)嘗試獲取CPU令牌,成為核心線程。如果線程成為了核心線程,則隨機(jī)從本地或全局的兩個(gè)隊(duì)列中獲取一個(gè)任務(wù),獲取不到則去隨機(jī)偷取一個(gè)任務(wù)。如果沒(méi)有獲取到CPU令牌,則優(yōu)先在本地獲取任務(wù),獲取不到則在全局非CPU任務(wù)隊(duì)列中獲取任務(wù),獲取不到則去偷取一個(gè)非CPU任務(wù)。
如果偷取的任務(wù)沒(méi)有達(dá)到最小的可偷取時(shí)間,則返回需要等待的時(shí)間。如果偷取任務(wù)成功,則直接加入到本地隊(duì)列中。偷取的核心過(guò)程,會(huì)在后面進(jìn)行分析。
5.Worker類的任務(wù)執(zhí)行機(jī)制
接下來(lái)分析任務(wù)被獲取到后如何被執(zhí)行,代碼如下:
// 執(zhí)行任務(wù)
private fun executeTask(task: Task) {
// 獲取任務(wù)類型,類型為純CPU或可能阻塞
val taskMode = task.mode
// 重置線程閑置狀態(tài)
idleReset(taskMode)
// 任務(wù)執(zhí)行前
beforeTask(taskMode)
// 執(zhí)行任務(wù)
runSafely(task)
// 任務(wù)執(zhí)行后
afterTask(taskMode)
}
// 重置線程閑置狀態(tài)
private fun idleReset(mode: Int) {
// 重置從PARKING狀態(tài)到TERMINATED狀態(tài)的時(shí)間
terminationDeadline = 0L
// 如果當(dāng)前狀態(tài)為PARKING,說(shuō)明尋找任務(wù)時(shí)沒(méi)有獲取到CPU令牌
if (state == WorkerState.PARKING) {
assert { mode == TASK_PROBABLY_BLOCKING }
// 設(shè)置狀態(tài)為BLOCKING
state = WorkerState.BLOCKING
}
}
// 任務(wù)執(zhí)行前
private fun beforeTask(taskMode: Int) {
// 如果執(zhí)行的任務(wù)為純CPU任務(wù),說(shuō)明當(dāng)前線程獲取到了CPU令牌,是核心線程,直接返回
if (taskMode == TASK_NON_BLOCKING) return
// 走到這里,說(shuō)明線程執(zhí)行的是非純CPU任務(wù),
// 沒(méi)有CPU令牌也可以執(zhí)行,因此嘗試釋放CPU令牌,進(jìn)入WorkerState.BLOCKING
if (tryReleaseCpu(WorkerState.BLOCKING)) {
// 如果釋放CPU令牌成功,則喚起一個(gè)線程去申請(qǐng)CPU令牌
signalCpuWork()
}
}
// 執(zhí)行任務(wù)
fun runSafely(task: Task) {
try {
task.run()
} catch (e: Throwable) {
// 異常發(fā)生時(shí),通知當(dāng)前線程的異常處理Handler
val thread = Thread.currentThread()
thread.uncaughtExceptionHandler.uncaughtException(thread, e)
} finally {
unTrackTask()
}
}
// 任務(wù)執(zhí)行后
private fun afterTask(taskMode: Int) {
// 如果執(zhí)行的任務(wù)為純CPU任務(wù),說(shuō)明當(dāng)前線程獲取到了CPU令牌,是核心線程,直接返回
if (taskMode == TASK_NON_BLOCKING) return
// 如果執(zhí)行的是非CPU任務(wù)
// 當(dāng)前執(zhí)行的非CPU任務(wù)數(shù)量減一
decrementBlockingTasks()
// 獲取當(dāng)前線程狀態(tài)
val currentState = state
// 如果線程當(dāng)前不是終止?fàn)顟B(tài)
if (currentState !== WorkerState.TERMINATED) {
assert { currentState == WorkerState.BLOCKING }
// 設(shè)置為休眠狀態(tài)
state = WorkerState.DORMANT
}
}四.CoroutineScheduler類的dispatch方法
了解Worker類的工作機(jī)制后,接下來(lái)分析CoroutineScheduler類的dispatch方法,代碼如下:
// block表示要執(zhí)行的任務(wù)
// taskContext表示任務(wù)執(zhí)行的上下文,里面包含任務(wù)的類型,和執(zhí)行完成后的回調(diào)
// tailDispatch表示當(dāng)前任務(wù)是否進(jìn)行隊(duì)列尾部調(diào)度,
// 當(dāng)tailDispatch為true時(shí),當(dāng)前block會(huì)在當(dāng)前線程的本地隊(duì)列里的任務(wù)全部執(zhí)行完后再執(zhí)行
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
// 上報(bào)時(shí)間,TimeSource相關(guān),無(wú)需關(guān)注
trackTask()
// 創(chuàng)建任務(wù)
val task = createTask(block, taskContext)
// 獲取當(dāng)前的Worker,可能獲取不到
val currentWorker = currentWorker()
// 將當(dāng)前的任務(wù)添加到當(dāng)前線程的本地隊(duì)列中
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
// 不為空,說(shuō)明沒(méi)有添加進(jìn)去,說(shuō)明當(dāng)前的線程不是Worker
if (notAdded != null) {
// 將任務(wù)添加到全局隊(duì)列中,如果添加失敗了
if (!addToGlobalQueue(notAdded)) {
// 說(shuō)明線程池正在關(guān)閉,拋出異常
throw RejectedExecutionException("$schedulerName was terminated")
}
}
// skipUnpark表示是否跳過(guò)喚起狀態(tài),取決于這下面兩個(gè)參數(shù)
val skipUnpark = tailDispatch && currentWorker != null
// 如果當(dāng)前類型為純CPU任務(wù)
if (task.mode == TASK_NON_BLOCKING) {
// 如果跳過(guò)喚醒,則直接返回
if (skipUnpark) return
// 喚醒一個(gè)執(zhí)行純CPU任務(wù)的線程
signalCpuWork()
} else {
// 喚醒一個(gè)執(zhí)行非CPU任務(wù)的線程
signalBlockingWork(skipUnpark = skipUnpark)
}
}
// 創(chuàng)建任務(wù)
internal fun createTask(block: Runnable, taskContext: TaskContext): Task {
// 獲取當(dāng)前時(shí)間
val nanoTime = schedulerTimeSource.nanoTime()
// 如果當(dāng)前的block是Task類型的
if (block is Task) {
// 重新設(shè)置提交時(shí)間和任務(wù)上下文
block.submissionTime = nanoTime
block.taskContext = taskContext
// 返回
return block
}
// 封裝成TaskImpl,返回
return TaskImpl(block, nanoTime, taskContext)
}
// 任務(wù)模型
// block表示執(zhí)行的任務(wù)
// submissionTime表示任務(wù)提交時(shí)間
// taskContext表示任務(wù)執(zhí)行的上下文
internal class TaskImpl(
@JvmField val block: Runnable,
submissionTime: Long,
taskContext: TaskContext
) : Task(submissionTime, taskContext) {
override fun run() {
try {
block.run()
} finally {
// 任務(wù)執(zhí)行完畢后,會(huì)在同一個(gè)Worker線程中回調(diào)afterTask方法
taskContext.afterTask()
}
}
override fun toString(): String =
"Task[${block.classSimpleName}@${block.hexAddress}, $submissionTime, $taskContext]"
}
// 將任務(wù)添加到本地隊(duì)列
private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
// 如果當(dāng)前線程為空,則返回任務(wù)
if (this == null) return task
// 如果線程處于終止?fàn)顟B(tài),則返回任務(wù)
if (state === WorkerState.TERMINATED) return task
// 如果任務(wù)為純CPU任務(wù),但是線程沒(méi)有CPU令牌
if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) {
// 則返回任務(wù)
return task
}
// 標(biāo)記本地隊(duì)列有任務(wù)
mayHaveLocalTasks = true
// 添加到隊(duì)列
return localQueue.add(task, fair = tailDispatch)
}
// 添加到全局隊(duì)列
private fun addToGlobalQueue(task: Task): Boolean {
// 根據(jù)任務(wù)的類型,添加到全局隊(duì)列的隊(duì)尾
return if (task.isBlocking) {
globalBlockingQueue.addLast(task)
} else {
globalCpuQueue.addLast(task)
}
}
// 對(duì)當(dāng)前線程進(jìn)行強(qiáng)制轉(zhuǎn)換,如果調(diào)度器也是當(dāng)前的調(diào)度器則返回Worker對(duì)象
private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
// 喚起一個(gè)執(zhí)行非純CPU任務(wù)的線程
private fun signalBlockingWork(skipUnpark: Boolean) {
// 當(dāng)前執(zhí)行阻塞任務(wù)的線程數(shù)量加1,并獲取當(dāng)前的控制狀態(tài)
val stateSnapshot = incrementBlockingTasks()
// 如果跳過(guò)喚起,則返回
if (skipUnpark) return
// 嘗試喚起,喚起成功,則返回
if (tryUnpark()) return
// 喚起失敗,則根據(jù)當(dāng)前的控制狀態(tài),嘗試創(chuàng)建新線程,成功則返回
if (tryCreateWorker(stateSnapshot)) return
// 再次嘗試喚起,防止多線程競(jìng)爭(zhēng)情況下,上面的tryUnpark方法正好卡在線程釋放CPU令牌與進(jìn)入PARKING狀態(tài)之間
// 因?yàn)榫€程先釋放CPU令牌,后進(jìn)入PARKING狀態(tài)
tryUnpark()
}
// 喚起一個(gè)執(zhí)行純CPU任務(wù)的線程
internal fun signalCpuWork() {
// 嘗試喚起,喚起成功,則返回
if (tryUnpark()) return
// 喚起失敗,則嘗試創(chuàng)建新線程,成功則返回
if (tryCreateWorker()) return
// 再次嘗試喚起,防止多線程競(jìng)爭(zhēng)情況下,上面的tryUnpark方法正好卡在線程釋放CPU令牌與進(jìn)入PARKING狀態(tài)之間
// 因?yàn)榫€程先釋放CPU令牌,后進(jìn)入PARKING狀態(tài)
tryUnpark()
}通過(guò)對(duì)上面的代碼進(jìn)行分析,可以知道CoroutineScheduler類的dispatch方法,首先會(huì)對(duì)任務(wù)進(jìn)行封裝。正常情況下,任務(wù)都會(huì)根據(jù)類型添加到全局隊(duì)列中,接著根據(jù)任務(wù)類型,隨機(jī)喚起一個(gè)執(zhí)行對(duì)應(yīng)類型任務(wù)的線程去執(zhí)行任務(wù)。
當(dāng)任務(wù)執(zhí)行完畢后,會(huì)回調(diào)任務(wù)中自帶的afterTask方法。根據(jù)之前對(duì)LimitingDispatcher的分析,可以知道,此時(shí)tailDispatch參數(shù)為true,同時(shí)當(dāng)前的線程也是Worker線程,因此會(huì)被直接添加到線程的本地隊(duì)列中,由于任務(wù)有對(duì)應(yīng)的線程執(zhí)行,因此跳過(guò)了喚起其他線程執(zhí)行任務(wù)的階段。這里我們可以稱這個(gè)機(jī)制為尾調(diào)機(jī)制。
為什么CoroutineScheduler類中要設(shè)計(jì)一個(gè)尾調(diào)機(jī)制呢?
在傳統(tǒng)的線程池的線程充足情況下,一個(gè)任務(wù)到來(lái)時(shí),會(huì)被分配一個(gè)線程。假設(shè)前后兩個(gè)任務(wù)A與B有依賴關(guān)系,需要在執(zhí)行A再執(zhí)行B,這時(shí)如果兩個(gè)任務(wù)同時(shí)到來(lái),執(zhí)行A任務(wù)的線程會(huì)直接執(zhí)行,而執(zhí)行B線程的任務(wù)可能需要被阻塞。而一旦線程阻塞會(huì)造成線程資源的浪費(fèi)。而協(xié)程本質(zhì)上就是多個(gè)小段程序的相互協(xié)作,因此這種場(chǎng)景會(huì)非常多,通過(guò)這種機(jī)制可以保證任務(wù)的執(zhí)行順序,同時(shí)減少資源浪費(fèi),而且可以最大限度的保證一個(gè)連續(xù)的任務(wù)執(zhí)行在同一個(gè)線程中。
至此,Dispatchers.IO線程池的工作原理全部分析完畢。
五.淺談WorkQueue類
1.add方法
接下來(lái)分析一些更加細(xì)節(jié)的過(guò)程。首先分析一下Worker線程本地隊(duì)列調(diào)用的add方法是如何添加任務(wù)的,代碼如下:
// 本地隊(duì)列中存儲(chǔ)最后一次尾調(diào)的任務(wù)
private val lastScheduledTask = atomic<Task?>(null)
// fair表示是否公平的執(zhí)行任務(wù),F(xiàn)IFO,默認(rèn)為false
fun add(task: Task, fair: Boolean = false): Task? {
// fair為true,則添加到隊(duì)尾
if (fair) return addLast(task)
// 如果fair為false,則從lastScheduledTask中取出上一個(gè)尾調(diào)的任務(wù),
// 并把這次的新尾調(diào)任務(wù)保存到lastScheduledTask
val previous = lastScheduledTask.getAndSet(task) ?: return null
// 如果獲取上一次的尾調(diào)任務(wù)不為空,則添加到隊(duì)尾
return addLast(previous)
}2.任務(wù)偷取機(jī)制
根據(jù)之前對(duì)Worker類的分析,任務(wù)偷取的核心代碼鎖定在了WorkQueue類的兩個(gè)方法上:一個(gè)是偷取非純CPU任務(wù)的tryStealBlockingFrom方法,另一個(gè)可以偷所有類型任務(wù)的tryStealFrom方法,代碼如下:
internal const val BUFFER_CAPACITY_BASE = 7
internal const val BUFFER_CAPACITY = 1 shl BUFFER_CAPACITY_BASE // 1000 0000
internal const val MASK = BUFFER_CAPACITY - 1 // 0111 1111
// 存儲(chǔ)任務(wù)的數(shù)組,最多存儲(chǔ)128
private val buffer: AtomicReferenceArray<Task?> = AtomicReferenceArray(BUFFER_CAPACITY)
// producerIndex表示上一次向任務(wù)數(shù)組中添加任務(wù)的索引
// consumerIndex表示上一次消費(fèi)的任務(wù)索引
// producerIndex永遠(yuǎn)大于等于consumerIndex
// 二者差值就是當(dāng)前任務(wù)數(shù)組中任務(wù)的數(shù)量
private val producerIndex = atomic(0)
private val consumerIndex = atomic(0)
// buffer中非純CPU任務(wù)的數(shù)量(避免遍歷掃描)
private val blockingTasksInBuffer = atomic(0)
// 偷所有類型任務(wù)
fun tryStealFrom(victim: WorkQueue): Long {
assert { bufferSize == 0 }
// 從要偷取線程的本地隊(duì)列中輪訓(xùn)獲取一個(gè)任務(wù)
val task = victim.pollBuffer()
// 如果獲取到了任務(wù)
if (task != null) {
// 將它添加到自己的本地隊(duì)列中
val notAdded = add(task)
assert { notAdded == null }
// 返回偷取成功的標(biāo)識(shí)
return TASK_STOLEN
}
// 如果偷取失敗,嘗試偷取指定線程的尾調(diào)任務(wù)
return tryStealLastScheduled(victim, blockingOnly = false)
}
// 輪訓(xùn)獲取任務(wù)
private fun pollBuffer(): Task? {
// 死循環(huán)
while (true) {
// 獲取上一次消費(fèi)的任務(wù)索引
val tailLocal = consumerIndex.value
// 如果當(dāng)前任務(wù)數(shù)組中沒(méi)有多處的任務(wù),則返回空
if (tailLocal - producerIndex.value == 0) return null
// 計(jì)算偷取位置,防止數(shù)組過(guò)界
val index = tailLocal and MASK
// 通過(guò)CAS方式,將consumerIndex加一,表示下一次要從tailLocal + 1處開始偷取
if (consumerIndex.compareAndSet(tailLocal, tailLocal + 1)) {
// 從偷取位置初取出任務(wù),如果偷取的任務(wù)為空,則繼續(xù)循環(huán)
val value = buffer.getAndSet(index, null) ?: continue
// 偷取成功
// 若任務(wù)為阻塞任務(wù),blockingTasksInBuffer的值減一
value.decrementIfBlocking()
// 返回任務(wù)
return value
}
}
}
// 偷取非純CPU任務(wù)
fun tryStealBlockingFrom(victim: WorkQueue): Long {
assert { bufferSize == 0 }
// 從consumerIndex位置開始偷
var start = victim.consumerIndex.value
// 偷到producerIndex處截止
val end = victim.producerIndex.value
// 獲取任務(wù)數(shù)組
val buffer = victim.buffer
// 循環(huán)偷取
while (start != end) {
// 計(jì)算偷取位置,防止數(shù)組過(guò)界
val index = start and MASK
// 如果非純CPU任務(wù)數(shù)為0,則直接退出循環(huán)
if (victim.blockingTasksInBuffer.value == 0) break
// 獲取index處的任務(wù)
val value = buffer[index]
// 如果任務(wù)存在,而且是非純CPU任務(wù),同時(shí)成功的通過(guò)CAS設(shè)置為空
if (value != null && value.isBlocking && buffer.compareAndSet(index, value, null)) {
// blockingTasksInBuffer的值減一
victim.blockingTasksInBuffer.decrementAndGet()
// 將偷取的任務(wù)添加到當(dāng)前線程的本地隊(duì)列中
add(value)
// 返回偷取成功標(biāo)識(shí)
return TASK_STOLEN
} else {
// 如果偷取失敗,自增再次循環(huán),從下一個(gè)位置開始偷
++start
}
}
// 如果從任務(wù)數(shù)組中偷取失敗,嘗試偷取指定線程的尾調(diào)任務(wù)
return tryStealLastScheduled(victim, blockingOnly = true)
}
// 偷取指定線程的尾調(diào)任務(wù)
private fun tryStealLastScheduled(victim: WorkQueue, blockingOnly: Boolean): Long {
// 死循環(huán)
while (true) {
// 獲取指定線程的尾調(diào)任務(wù),如果任務(wù)不存在,則返回偷取失敗標(biāo)識(shí)符
val lastScheduled = victim.lastScheduledTask.value ?: return NOTHING_TO_STEAL
// 如果要偷取的是非純CPU任務(wù),但是任務(wù)類型為純CPU任務(wù),說(shuō)明只有核心線程才能偷
// 返回偷取失敗標(biāo)識(shí)符
if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL
// 獲取當(dāng)前時(shí)間
val time = schedulerTimeSource.nanoTime()
//計(jì)算任務(wù)從添加開始到現(xiàn)在經(jīng)過(guò)的時(shí)長(zhǎng)
val staleness = time - lastScheduled.submissionTime
// 如果時(shí)長(zhǎng)小于偷取冷卻時(shí)間
if (staleness < WORK_STEALING_TIME_RESOLUTION_NS) {
// 返回當(dāng)前線程需要等待的時(shí)間
return WORK_STEALING_TIME_RESOLUTION_NS - staleness
}
// 通過(guò)CAS,將lastScheduledTask設(shè)置為空,防止被其他線程執(zhí)行
if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) {
// 偷取成功,加入到當(dāng)前線程的隊(duì)列中
add(lastScheduled)
// 返回偷取成功表示
return TASK_STOLEN
}
// 繼續(xù)循環(huán)
continue
}
}
// 偷取冷卻時(shí)間,尾調(diào)任務(wù)從添加開始,
// 最少經(jīng)過(guò)WORK_STEALING_TIME_RESOLUTION_NS時(shí)間才可以被偷
@JvmField
internal val WORK_STEALING_TIME_RESOLUTION_NS = systemProp(
"kotlinx.coroutines.scheduler.resolution.ns", 100000L
)六.總結(jié)
1.兩個(gè)線程池
CoroutineScheduler類是核心的線程池,用于任務(wù)的執(zhí)行。LimitingDispatcher類對(duì)CoroutineScheduler類進(jìn)行代理,是CoroutineScheduler類尾調(diào)機(jī)制的使用者,對(duì)任務(wù)進(jìn)行初步排隊(duì)。
2.四種隊(duì)列
LimitingDispatcher類中的任務(wù)隊(duì)列。CoroutineScheduler類中的兩個(gè)全局隊(duì)列。Worker類中的本地隊(duì)列。
3.尾調(diào)機(jī)制
一個(gè)任務(wù)執(zhí)行完,可以通過(guò)回調(diào),在同一個(gè)Worker線程中再存儲(chǔ)一個(gè)待執(zhí)行任務(wù),該任務(wù)將在Worker線程本地隊(duì)列目前已存在的任務(wù),執(zhí)行完畢后再執(zhí)行。
4.任務(wù)分類與權(quán)限控制
所有任務(wù)分成純CPU任務(wù)和非純CPU任務(wù)兩種,對(duì)應(yīng)著核心線程和非核心線程。
所有線程在執(zhí)行前都先嘗試成為核心線程,核心線程可以從兩種任務(wù)中任意選擇執(zhí)行,非核心線程只能執(zhí)行非純CPU任務(wù)。核心線程如果選擇執(zhí)行非純CPU任務(wù)會(huì)變成非核心線程
5.任務(wù)偷取機(jī)制
WorkQueue類根據(jù)隨機(jī)算法提供任務(wù)偷取機(jī)制,一個(gè)Worker線程可以從其他Worker線程的本地隊(duì)列中偷取任務(wù)。
6.執(zhí)行梳理圖

到此這篇關(guān)于Android Dispatchers.IO線程池深入刨析的文章就介紹到這了,更多相關(guān)Android Dispatchers.IO內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Android開發(fā)筆記之Intent初級(jí)學(xué)習(xí)教程
這篇文章主要介紹了Android開發(fā)筆記之Intent初級(jí)學(xué)習(xí),較為詳細(xì)的分析了Android Intent項(xiàng)目的建立,功能實(shí)現(xiàn)及Intent使用技巧,需要的朋友可以參考下2016-02-02
一個(gè)Activity中多個(gè)Fragment實(shí)現(xiàn)沉浸式狀態(tài)欄的解決方法
這篇文章主要介紹了一個(gè)Activity中多個(gè)Fragment實(shí)現(xiàn)沉浸式狀態(tài)欄解決方法,對(duì)于解決這個(gè)問(wèn)題要分為兩部分,具體內(nèi)容詳情,大家參考下本文吧2017-01-01
5種Android數(shù)據(jù)存儲(chǔ)方式匯總
這篇文章主要為大家整理了5種Android數(shù)據(jù)存儲(chǔ)方式,列出了各存儲(chǔ)方式的優(yōu)缺點(diǎn),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-12-12
Android開發(fā)跳轉(zhuǎn)應(yīng)用市場(chǎng)進(jìn)行版本更新功能實(shí)現(xiàn)
這篇文章主要為大家介紹了Android實(shí)現(xiàn)跳轉(zhuǎn)到應(yīng)用市場(chǎng)進(jìn)行版本更新功能,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-04-04
Android 判斷ip地址合法實(shí)現(xiàn)代碼
這篇文章主要介紹了Android 判斷ip地址合法實(shí)現(xiàn)代碼的相關(guān)資料,需要的朋友可以參考下2017-06-06
Android webview與js的數(shù)據(jù)交互
有了WebView這個(gè)組件,Android應(yīng)用開發(fā)技術(shù)也就轉(zhuǎn)嫁到html與java數(shù)據(jù)交互上來(lái)。說(shuō)白了就是js與WebView的數(shù)據(jù)交互,這就是本文所要討論的2017-04-04
Android加載對(duì)話框同時(shí)異步執(zhí)行實(shí)現(xiàn)方法
Android中通過(guò)子線程連接網(wǎng)絡(luò)獲取資料,同時(shí)顯示加載進(jìn)度對(duì)話框給用戶的操作2012-11-11

