欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kotlin?Dispatchers協(xié)程調(diào)度器源碼深入分析

 更新時(shí)間:2022年11月21日 16:50:14   作者:Super-B  
Kotlin協(xié)程不是什么空中閣樓,Kotlin源代碼會(huì)被編譯成class字節(jié)碼文件,最終會(huì)運(yùn)行到虛擬機(jī)中。所以從本質(zhì)上講,Kotlin和Java是類似的,都是可以編譯產(chǎn)生class的語言,但最終還是會(huì)受到虛擬機(jī)的限制,它們的代碼最終會(huì)在虛擬機(jī)上的某個(gè)線程上被執(zhí)行

Dispatchers協(xié)程調(diào)度器

CoroutineDispatcher,具有用于調(diào)度任務(wù)的底層執(zhí)行器。ExecutorCoroutineDispatcher的實(shí)例應(yīng)由調(diào)度程序的所有者關(guān)閉。

此類通常用作基于協(xié)程的API和異步API之間的橋梁,異步API需要Executor的實(shí)例。

根據(jù)各種調(diào)度器的繼承關(guān)系,梳理如下繼承結(jié)構(gòu):

CoroutineDispatcher基類將由所有協(xié)程調(diào)度器實(shí)現(xiàn)擴(kuò)展,kotlin官方實(shí)現(xiàn)了以下四種調(diào)度器:

Dispatchers.Default -如果上下文中未指定調(diào)度器或任何其他ContinuationInterceptor,則所有標(biāo)準(zhǔn)構(gòu)建器都使用默認(rèn)值。它使用共享后臺(tái)線程的公共池。對(duì)于消耗CPU資源的計(jì)算密集型協(xié)程來說,這是一個(gè)合適的選擇。

Dispatchers.IO -使用按需創(chuàng)建線程的共享池,用于卸載IO密集型阻塞操作(如文件I/O和阻塞套接字I/O)。

Dispatchers.Unconfined -在當(dāng)前調(diào)用幀中啟動(dòng)協(xié)程執(zhí)行,直到第一次暫停,然后協(xié)程生成器函數(shù)返回。協(xié)程稍后將在相應(yīng)的掛起函數(shù)使用的任何線程中恢復(fù),而不將其限制在任何特定的線程或池中。無約束調(diào)度器通常不應(yīng)在代碼中使用。

HandlerContext -在主線程中調(diào)度任務(wù),android中主線程也就是ui線程,使用該調(diào)度器謹(jǐn)慎ANR異常,不應(yīng)該使用該調(diào)度器調(diào)度阻塞或者耗時(shí)任務(wù)。

可以使用newSingleThreadContext和newFixedThreadPoolContext創(chuàng)建專用線程池。

可以使用asCoroutineDispatcher擴(kuò)展函數(shù)將任意執(zhí)行器轉(zhuǎn)換為調(diào)度器。

Dispatchers.Default

這個(gè)調(diào)度器的類型是DefaultScheduler,一般是做cpu密集計(jì)算型任務(wù),內(nèi)部包含的成員變量IO,也就是對(duì)應(yīng)的Dispatchers.IO調(diào)度器。主要實(shí)現(xiàn)在ExecutorCoroutineDispatcher()中,代碼如下:

internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    val IO: CoroutineDispatcher = LimitingDispatcher(
        this,
        systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
        "Dispatchers.IO",
        TASK_PROBABLY_BLOCKING
    )
	//省略。。。
}
public open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int,
    private val maxPoolSize: Int,
    private val idleWorkerKeepAliveNs: Long,
    private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
    public constructor(//省略。。。)
    override val executor: Executor
        get() = coroutineScheduler
    // This is variable for test purposes, so that we can reinitialize from clean state
    private var coroutineScheduler = createScheduler()
    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
            DefaultExecutor.dispatch(context, block)
        }
    override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
        try {
            coroutineScheduler.dispatch(block, tailDispatch = true)
        } catch (e: RejectedExecutionException) {
            DefaultExecutor.dispatchYield(context, block)
        }
    }
    //省略。。。
}

Default調(diào)度器其實(shí)沒做什么特別的操作,只是用coroutineScheduler代理實(shí)現(xiàn)了協(xié)程的調(diào)度。

Dispatchers.IO

這個(gè)是LimitingDispatcher類型的,是DefaultScheduler類型的成員變量,而LimitingDispatcher類型又是繼承自ExecutorCoroutineDispatcher的,LimitingDispatcher在它基礎(chǔ)上做了有調(diào)度個(gè)數(shù)限制的排隊(duì)機(jī)制,IO這個(gè)名字代表的IO操作,IO操作又是阻塞線程的操作,線程不能及時(shí)釋放,所以加入了隊(duì)列機(jī)制,防止IO線程爆炸式增長。如下:

internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    val IO: CoroutineDispatcher = LimitingDispatcher(
        this,
        systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
        "Dispatchers.IO",
        TASK_PROBABLY_BLOCKING
    )
    //省略。。。
}
private class LimitingDispatcher(
    private val dispatcher: ExperimentalCoroutineDispatcher,
    private val parallelism: Int,
    private val name: String?,
    override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
    private val queue = ConcurrentLinkedQueue<Runnable>()
    private val inFlightTasks = atomic(0)
    private fun dispatch(block: Runnable, tailDispatch: Boolean) {
        var taskToSchedule = block
        while (true) {
            // Commit in-flight tasks slot
            val inFlight = inFlightTasks.incrementAndGet()
            // Fast path, if parallelism limit is not reached, dispatch task and return
            if (inFlight <= parallelism) {
                dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
                return
            }
            queue.add(taskToSchedule)
            if (inFlightTasks.decrementAndGet() >= parallelism) {
                return
            }
            taskToSchedule = queue.poll() ?: return
        }
    }
    override fun dispatchYield(context: CoroutineContext, block: Runnable) {
        dispatch(block, tailDispatch = true)
    }
}

構(gòu)造函數(shù) 傳入了parallelism參數(shù) ,這個(gè)是并發(fā)數(shù)。

dispatchYield方法 實(shí)現(xiàn)是直接調(diào)用的dispatch方法。

dispatch方法:一個(gè)while循環(huán),循環(huán)內(nèi),

  • 給inFlightTasks變量加一(這個(gè)變量代表正在調(diào)度中的個(gè)數(shù)),如果inFlightTasks <= parallelism,代表當(dāng)前調(diào)度任務(wù)數(shù)小于最大并發(fā)數(shù),說明可以繼續(xù)向調(diào)度器中調(diào)度任務(wù)
  • 否則將任務(wù)加入到隊(duì)列中,接著嘗試將inFlightTasks減一,如果大于并發(fā)數(shù),那么直接結(jié)束;
  • 如果小于并發(fā)數(shù),說明剛剛已經(jīng)有任務(wù)結(jié)束了,讓出了并發(fā)數(shù),這個(gè)時(shí)候可以再次嘗試從隊(duì)列中取出任務(wù),從1開始。
    override fun afterTask() {
        var next = queue.poll()
        // If we have pending tasks in current blocking context, dispatch first
        if (next != null) {
            dispatcher.dispatchWithContext(next, this, true)
            return
        }
        inFlightTasks.decrementAndGet()
        next = queue.poll() ?: return
        dispatch(next, true)
    }

afterTask方法

這個(gè)方法是任務(wù)調(diào)度結(jié)束后的回調(diào),這里面首先從隊(duì)列中取出一個(gè)任務(wù),

任務(wù)不為空,讓調(diào)度器調(diào)度這個(gè)任務(wù),結(jié)束;

為空,給調(diào)度任務(wù)數(shù)加一,然后嘗試取出任務(wù),為空返回,不為空,繼續(xù)調(diào)用dispatch方法,整個(gè)流程就串起來了。

整個(gè)流程如下圖所示:

綜上:IO調(diào)度器側(cè)重于調(diào)度任務(wù)數(shù)量的限制,防止IO操作阻塞線程,讓線程數(shù)量爆炸式增長。

Dispatchers.Main

具體的實(shí)現(xiàn)類是HandlerContext,代碼如下:

HandlerContext(Looper.getMainLooper().asHandler(async = true))
internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
	//省略。。。
}

主線程中調(diào)度任務(wù),android中主線程也就是ui線程。實(shí)現(xiàn)原理是內(nèi)部持有一個(gè)val handler : Handler = Looper.getMainLooper().asHandler(async = true),這個(gè)handler正是主線程的handler。

在調(diào)用dispatch調(diào)度方法的時(shí)候,是使用handler發(fā)送一個(gè)Runnable任務(wù),

override fun dispatch(context: CoroutineContext, block: Runnable) {
    handler.post(block)
}

在delay的時(shí)候,如果當(dāng)前的dispatcher正是HandlerContext,那么實(shí)現(xiàn)是handler發(fā)送一個(gè)延遲了timeMillis毫秒時(shí)長的Runnable。invokeOnCancellation的擴(kuò)展方法是在協(xié)程被取消的時(shí)候,移除掉該runnable消息。

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
    val block = Runnable {
        with(continuation) { resumeUndispatched(Unit) }
    }
    handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
    continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}

下面這個(gè)方法也比較常看到,就是協(xié)程在調(diào)度continuation的時(shí)候,會(huì)去判斷是不是需要去調(diào)度,不需要的話,直接在當(dāng)前線程執(zhí)行,需要調(diào)度的,需要由dispatcher來重新調(diào)度任務(wù),這樣可能執(zhí)行的線程會(huì)被切換,如果不是主線程的話,、就需要調(diào)度了, 如果是主線程的話立刻執(zhí)行。

override fun isDispatchNeeded(context: CoroutineContext): Boolean {
    return !invokeImmediately || Looper.myLooper() != handler.looper
}

Dispatchers.Unconfined

具體的實(shí)現(xiàn)如下:

internal object Unconfined : CoroutineDispatcher() {
	//省略。。。
}

isDispatchNeeded直接返回false,代表不需要重新調(diào)度。

override fun isDispatchNeeded(context: CoroutineContext): Boolean = false

dispatchYield沒有被覆寫,直接調(diào)用dispatch方法,用的還是CoroutineDispatcher的實(shí)現(xiàn)。

dispatch的報(bào)錯(cuò)信息顯示,Unconfined調(diào)度器只能在存在YieldContext的時(shí)候調(diào)度,否則就會(huì)報(bào)異常。

//CoroutineDispatcher
public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
//Unconfined
override fun dispatch(context: CoroutineContext, block: Runnable) {
    // It can only be called by the "yield" function. See also code of "yield" function.
    val yieldContext = context[YieldContext]
    if (yieldContext != null) {
        // report to "yield" that it is an unconfined dispatcher and don't call "block.run()"
        yieldContext.dispatcherWasUnconfined = true
        return
    }
    throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " +
        "If you wrap Unconfined dispatcher in your code, make sure you properly delegate " +
        "isDispatchNeeded and dispatch calls.")
}

yied方法:是暫時(shí)讓出工作線程,等待下一次線程調(diào)取恢復(fù)協(xié)程。

yield代碼如下:

public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
    val context = uCont.context
    context.checkCompletion()
    val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
    if (cont.dispatcher.isDispatchNeeded(context)) {
        cont.dispatchYield(context, Unit)
    } else {
        val yieldContext = YieldContext()
        cont.dispatchYield(context + yieldContext, Unit)
        if (yieldContext.dispatcherWasUnconfined) {
            return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
        }
    }
    COROUTINE_SUSPENDED
}

如果isDispatchNeeded == true,那么就需要重新將協(xié)程被調(diào)度器調(diào)度一次,線程有可能切換掉;

如果isDispatchNeeded == false,上下文集合需要添加val yieldContext = YieldContext()這個(gè)元素(在上面的Dispatchers.Unconfined

的dispatche方法中,如果有YieldContext元素,將dispatcherWasUnconfined設(shè)置為true,代表yield操作什么都沒有做,需要協(xié)程調(diào)度器用其他方法調(diào)度一次)。

判斷dispatcherWasUnconfined,true:說明Dispatchers.Unconfined什么都沒有做,需要在調(diào)度一次,調(diào)用了yieldUndispatched方法,這個(gè)方法大概就是讓協(xié)程直接恢復(fù)一次,或者線程調(diào)度一次恢復(fù);

false:說明正在被調(diào)度器調(diào)度,是個(gè)掛起點(diǎn),返回COROUTINE_SUSPENDED值。

不太清楚Dispatchers.Unconfined這個(gè)調(diào)度器有啥用,有知道的留言下,學(xué)習(xí)學(xué)習(xí)。

協(xié)程調(diào)度器的實(shí)現(xiàn)CoroutineScheduler

調(diào)度過程正真的實(shí)現(xiàn)是CoroutineScheduler這個(gè)類,上面說的四種調(diào)度器是包裝類,調(diào)度邏輯在CoroutineScheduler中,代碼如下:

internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int,
    @JvmField val maxPoolSize: Int,
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
	//省略。。。
}

構(gòu)造函數(shù)入?yún)?corePoolSize: Int定義核心線程數(shù),maxPoolSize: Int定義最大線程數(shù)量

	fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
        val task = createTask(block, taskContext)
        // try to submit the task to the local queue and act depending on the result
        val currentWorker = currentWorker()
        val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
        if (notAdded != null) {
            if (!addToGlobalQueue(notAdded)) {
                // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
                throw RejectedExecutionException("$schedulerName was terminated")
            }
        }
        val skipUnpark = tailDispatch && currentWorker != null
        // Checking 'task' instead of 'notAdded' is completely okay
        if (task.mode == TASK_NON_BLOCKING) {
            if (skipUnpark) return
            signalCpuWork()
        } else {
            // Increment blocking tasks anyway
            signalBlockingWork(skipUnpark = skipUnpark)
        }
	}

dispatch函數(shù)的實(shí)現(xiàn):

創(chuàng)建task,block如果是Task類型的話,設(shè)置submissionTime變量,submissionTime變量用于延遲執(zhí)行的時(shí)間判斷,以及隊(duì)列排序的時(shí)間順序;設(shè)置taskContext,該變量是task執(zhí)行的協(xié)程上下文。不是Task類型的話,會(huì)創(chuàng)建TaskImp類型的任務(wù)返回,關(guān)鍵是finally中的taskContext.afterTask(),就是task執(zhí)行完成后需要回調(diào)afterTask通知協(xié)程上下文執(zhí)行完畢了,上面的Dispatchers.IO里面的LimitingDispatcher調(diào)度器就是需要afterTask回調(diào)通知,才能將隊(duì)列中下一個(gè)任務(wù)拋給CoroutineScheduler去執(zhí)行。

   internal fun createTask(block: Runnable, taskContext: TaskContext): Task {
        val nanoTime = schedulerTimeSource.nanoTime()
        if (block is Task) {
            block.submissionTime = nanoTime
            block.taskContext = taskContext
            return block
        }
        return TaskImpl(block, nanoTime, taskContext)
    }
internal class TaskImpl(
    @JvmField val block: Runnable,
    submissionTime: Long,
    taskContext: TaskContext
) : Task(submissionTime, taskContext) {
    override fun run() {
        try {
            block.run()
        } finally {
            taskContext.afterTask()
        }
    }
}

獲取當(dāng)前的工作線程,如果當(dāng)前是工作線程直接返回,不是的話返回空

private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf {<!--{C}%3C!%2D%2D%20%2D%2D%3E--> it.scheduler == this }

將任務(wù)提交到工作線程的本地隊(duì)列中

    private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
        if (this == null) return task
        if (state === WorkerState.TERMINATED) return task
        if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) {
            return task
        }
        mayHaveLocalTasks = true
        return localQueue.add(task, fair = tailDispatch)
    }

返回是空的,說明添加成功了,返回task說明沒有添加成功。

如果線程是中斷狀態(tài),那么直接返回task。 如果任務(wù)是非阻塞的也就是cpu密集型任務(wù),而線程是阻塞的(正在執(zhí)行任務(wù)中),那么不添加任務(wù),直接返回task。 其他情況,添加任務(wù)到隊(duì)列中,mayHaveLocalTasks標(biāo)志位true,代表當(dāng)前線程中有任務(wù)。

沒有添加的話,需要添加到全局隊(duì)列中,globalCpuQueue全局cpu密集型隊(duì)列,globalBlockingQueue全局IO隊(duì)列,根據(jù)任務(wù)類型添加到對(duì)應(yīng)的隊(duì)列中。如果全局隊(duì)列都添加失敗的話,直接拋出異常。

	 if (notAdded != null) {
	     if (!addToGlobalQueue(notAdded)) {
	         // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
	         throw RejectedExecutionException("$schedulerName was terminated")
	     }
	 }
    val globalCpuQueue = GlobalQueue()
    val globalBlockingQueue = GlobalQueue()
    private fun addToGlobalQueue(task: Task): Boolean {
        return if (task.isBlocking) {
            globalBlockingQueue.addLast(task)
        } else {
            globalCpuQueue.addLast(task)
        }
    }

根據(jù)是否是尾部添加和當(dāng)前線程是否是空,決定是否跳過喚醒工作線程的步驟。

val skipUnpark = tailDispatch && currentWorker != null

非阻塞任務(wù):skipUnpark為true,跳過喚醒步驟,否則喚醒cpu密集型線程;阻塞任務(wù):skipUnpark為true,跳過喚醒步驟,喚醒IO線程。

       // Checking 'task' instead of 'notAdded' is completely okay
       if (task.mode == TASK_NON_BLOCKING) {
           if (skipUnpark) return
           signalCpuWork()
       } else {
           // Increment blocking tasks anyway
           signalBlockingWork(skipUnpark = skipUnpark)
       }

看下喚醒步驟的具體實(shí)現(xiàn),大概都是先tryUnpark,喚醒線程,如果沒有喚醒成功,創(chuàng)建一個(gè)新的線程,再次嘗試喚醒。

    private fun signalBlockingWork(skipUnpark: Boolean) {
        // Use state snapshot to avoid thread overprovision
        val stateSnapshot = incrementBlockingTasks()
        if (skipUnpark) return
        if (tryUnpark()) return
        if (tryCreateWorker(stateSnapshot)) return
        tryUnpark() // Try unpark again in case there was race between permit release and parking
    }
    internal fun signalCpuWork() {
        if (tryUnpark()) return
        if (tryCreateWorker()) return
        tryUnpark()
    }

看下工作線程的具體實(shí)現(xiàn)吧:

worker繼承自Thread,實(shí)現(xiàn)了run方法,具體是由runWorker()方法實(shí)現(xiàn)的,每個(gè)工作線程都有一個(gè)本地隊(duì)列用于存儲(chǔ)任務(wù),這樣本地有任務(wù)就不用去全局隊(duì)列中去搶資源了,減少鎖競爭。

	internal inner class Worker private constructor() : Thread() {
		//省略。。。
		@JvmField
        val localQueue: WorkQueue = WorkQueue()
        @JvmField
        var mayHaveLocalTasks = false
		override fun run() = runWorker()
		//省略。。。
   }

runWorker() 的實(shí)現(xiàn):

        private fun runWorker() {
            var rescanned = false
            while (!isTerminated && state != WorkerState.TERMINATED) {
                val task = findTask(mayHaveLocalTasks)
                // Task found. Execute and repeat
                if (task != null) {
                    rescanned = false
                    minDelayUntilStealableTaskNs = 0L
                    executeTask(task)
                    continue
                } else {
                    mayHaveLocalTasks = false
                }
                if (minDelayUntilStealableTaskNs != 0L) {
                    if (!rescanned) {
                        rescanned = true
                    } else {
                        rescanned = false
                        tryReleaseCpu(WorkerState.PARKING)
                        interrupted()
                        LockSupport.parkNanos(minDelayUntilStealableTaskNs)
                        minDelayUntilStealableTaskNs = 0L
                    }
                    continue
                }
                tryPark()
            }
            tryReleaseCpu(WorkerState.TERMINATED)
        }

工作線程是用while循環(huán)一直運(yùn)行的,循環(huán)內(nèi):

val task = findTask(mayHaveLocalTasks),前面這個(gè)變量mayHaveLocalTasks出現(xiàn)過,在添加task到本地隊(duì)列的時(shí)候,會(huì)置為true,本地隊(duì)列有任務(wù),從本地獲取,沒有就從全局隊(duì)列中獲取,如果還是沒有,從其他線程隊(duì)列中偷取任務(wù)到自己隊(duì)列中:

    fun findTask(scanLocalQueue: Boolean): Task? {
        if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
        // If we can't acquire a CPU permit -- attempt to find blocking task
        val task = if (scanLocalQueue) {
            localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
        } else {
            globalBlockingQueue.removeFirstOrNull()
        }
        return task ?: trySteal(blockingOnly = true)
    }

trySteal方法,循環(huán)workers隊(duì)列,遍歷線程本地隊(duì)列,去偷取任務(wù),偷到的話返回任務(wù),沒偷到的話,返回null:

private fun trySteal(blockingOnly: Boolean): Task? {
			//省略。。。
            var currentIndex = nextInt(created)
            var minDelay = Long.MAX_VALUE
            repeat(created) {
            	//省略。。。
                val worker = workers[currentIndex]
                if (worker !== null && worker !== this) {
                    val stealResult = if (blockingOnly) {
                        localQueue.tryStealBlockingFrom(victim = worker.localQueue)
                    } else {
                        localQueue.tryStealFrom(victim = worker.localQueue)
                    }
                    if (stealResult == TASK_STOLEN) {
                        return localQueue.poll()
                    } else if (stealResult > 0) {
                        minDelay = min(minDelay, stealResult)
                    }
                }
            }
            minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0
            return null
        }

在偷不到任務(wù)的時(shí)候會(huì)設(shè)置一個(gè)變量,stealResult等于-2,最后minDelayUntilStealableTaskNs 等于0;

internal const val TASK_STOLEN = -1L
internal const val NOTHING_TO_STEAL = -2L

在偷取任務(wù)的時(shí)候,如果上個(gè)任務(wù)時(shí)間和這次時(shí)間間隔太短的話,返回下次執(zhí)行的間隔時(shí)間差,minDelayUntilStealableTaskNs設(shè)置為這個(gè)時(shí)間值,大于0。

找到task了,直接執(zhí)行任務(wù)executeTask(task) ,執(zhí)行完成,continue循環(huán),從1開始;

沒找到任務(wù),設(shè)置mayHaveLocalTasks = false

如果minDelayUntilStealableTaskNs不等于0,就是上面的間隔時(shí)間太短的條件觸發(fā),那么讓線程釋放鎖(防止線程執(zhí)行任務(wù)太過密集,等待下次循環(huán)再去調(diào)度任務(wù)),continue循環(huán),從1開始;

上面條件不成立,調(diào)用tryPark(),這個(gè)是和unPark相反的操作,讓線程閑置,放入到線程隊(duì)列中:

    private fun tryPark() {
        if (!inStack()) {
            parkedWorkersStackPush(this)
            return
        }
        assert { localQueue.size == 0 }
        workerCtl.value = PARKED // Update value once
        while (inStack()) { // Prevent spurious wakeups
            if (isTerminated || state == WorkerState.TERMINATED) break
            tryReleaseCpu(WorkerState.PARKING)
            interrupted() // Cleanup interruptions
            park()
        }
    }

首先判斷是否在隊(duì)列中,不在的話,放入線程隊(duì)列中;在隊(duì)列中,將狀態(tài)設(shè)置為PARKED,不斷循環(huán)將釋放線程的cpu占用鎖,嘗試放到隊(duì)列中,park函數(shù)中有可能銷毀工作線程,看線程是否到達(dá)死亡時(shí)間點(diǎn)。

worker工作流程如下圖所示:

總結(jié)

1. Dispatchers的四種調(diào)度器是餓漢式單例對(duì)象,所以一個(gè)進(jìn)程只存在一個(gè)實(shí)例對(duì)象。

2. Dispatchers的四種調(diào)度器中,IO和default是共用的一個(gè)線程池,它的實(shí)現(xiàn)是CoroutineScheduler。

3. CoroutineScheduler線程池,有一個(gè)保存線程的隊(duì)列,有兩種全局任務(wù)隊(duì)列:一個(gè)是IO阻塞型隊(duì)列,一個(gè)是cpu密集型任務(wù)隊(duì)列;Worker線程擁有一個(gè)本地任務(wù)隊(duì)列。

4. Worker線程會(huì)根據(jù)任務(wù)類型,去對(duì)應(yīng)的全局隊(duì)列或者從本地隊(duì)列找任務(wù),找不到會(huì)從其他worker隊(duì)列中偷任務(wù),然后執(zhí)行;worker會(huì)根據(jù)自己的狀態(tài)回到線程隊(duì)列或者銷毀自己。

到此這篇關(guān)于Kotlin Dispatchers協(xié)程調(diào)度器原阿門深入分析的文章就介紹到這了,更多相關(guān)Kotlin Dispatchers內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Android基于ImageView繪制的開關(guān)按鈕效果示例

    Android基于ImageView繪制的開關(guān)按鈕效果示例

    這篇文章主要介紹了Android基于ImageView繪制的開關(guān)按鈕效果,結(jié)合實(shí)例形式分析了Android使用ImageView進(jìn)行按鈕繪制的界面布局、功能實(shí)現(xiàn)及相關(guān)注意事項(xiàng),需要的朋友可以參考下
    2017-03-03
  • android解析JSON數(shù)據(jù)

    android解析JSON數(shù)據(jù)

    本文給大家介紹的是在Android中解析json數(shù)據(jù)的方法的幾種方法,非常的簡單實(shí)用,有需要的小伙伴可以參考下
    2016-03-03
  • 安卓版微信跳一跳輔助 跳一跳輔助Java代碼

    安卓版微信跳一跳輔助 跳一跳輔助Java代碼

    這篇文章主要為大家詳細(xì)介紹了安卓版微信跳一跳輔助,簡易版ADB命令式實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-01-01
  • Android控件Spinner的使用方法(1)

    Android控件Spinner的使用方法(1)

    這篇文章主要為大家詳細(xì)介紹了Android控件Spinner的使用方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-08-08
  • 詳解Android 7.0 Settings 加載選項(xiàng)

    詳解Android 7.0 Settings 加載選項(xiàng)

    本篇文章主要介紹了Android 7.0 Settings 加載選項(xiàng),Android 7.0 Settings頂部多了一個(gè)建議選項(xiàng),多了個(gè)側(cè)邊欄,操作更加便捷了,有興趣的可以了解一下。
    2017-02-02
  • Android實(shí)現(xiàn)數(shù)字跳動(dòng)效果的TextView方法示例

    Android實(shí)現(xiàn)數(shù)字跳動(dòng)效果的TextView方法示例

    數(shù)字跳動(dòng)效果相信大家應(yīng)該都見過,在開發(fā)加上這種效果后會(huì)讓ui交互看起來非常不錯(cuò),所以下面這篇文章主要給大家介紹了Android實(shí)現(xiàn)數(shù)字跳動(dòng)的TextView的相關(guān)資料,文中給出了詳細(xì)的示例代碼,需要的朋友可以參考學(xué)習(xí),下面來一起看看吧。
    2017-04-04
  • Flutter實(shí)現(xiàn)圖文并茂的列表

    Flutter實(shí)現(xiàn)圖文并茂的列表

    列表在 App 中是最常見的形式了,在 Flutter 中提供了 ListView 這個(gè)組件來實(shí)現(xiàn)列表,本篇將通過 ListView 實(shí)現(xiàn)一個(gè)圖文并茂的列表。
    2021-05-05
  • rxjava+retrofit實(shí)現(xiàn)多圖上傳實(shí)例代碼

    rxjava+retrofit實(shí)現(xiàn)多圖上傳實(shí)例代碼

    本篇文章主要介紹了rxjava+retrofit實(shí)現(xiàn)多圖上傳實(shí)例代碼,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-06-06
  • 解決Android studio3.6安裝后gradle Download失敗(構(gòu)建不成功)

    解決Android studio3.6安裝后gradle Download失敗(構(gòu)建不成功)

    這篇文章主要介紹了解決Android studio3.6安裝后gradle Download失敗(構(gòu)建不成功),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-03-03
  • Android使用WebView實(shí)現(xiàn)截圖分享功能

    Android使用WebView實(shí)現(xiàn)截圖分享功能

    這篇文章主要為大家詳細(xì)介紹了Android使用WebView實(shí)現(xiàn)截圖分享功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-05-05

最新評(píng)論