欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RxJava2 Scheduler使用實例深入解析

 更新時間:2022年10月25日 09:58:50   作者:蝶翼的罪  
這篇文章主要為大家介紹了RxJava2 Scheduler使用實例深入解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

前言

歡迎來到大家深入理解 RxJava2 系列第二篇,這里先插上一句,本系列文章用的源碼都是基于 RxJava 2.2.0 正式版。本篇文章將先與大家一起理解 Scheduler 與 Worker ,順著 RxJava2 的源碼捋一下它們的實現(xiàn)原理。

Scheduler 與 Worker

Scheduler 與 Worker 在 RxJava2 中是一個非常重要的概念,他們是 RxJava 線程調(diào)度的核心與基石。用過的人肯定都會了解一些,但是想必了解 Worker 的讀者們就不多了。很多人會疑惑,既然有了 Scheduler 可以直接調(diào)度 Runnable,為何又強加一個 Worker 的概念,諸位稍安勿躁,跟著筆者的思路一起走下去。

定義

筆者這里展示一下 Scheduler 最核心的定義部分:

public abstract class Scheduler {
    @NonNull
    public abstract Worker createWorker();
    public Disposable scheduleDirect(@NonNull Runnable run) {
        ...
    }
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        ...
    }
    @NonNull
    public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
        ...
    }
    public abstract static class Worker implements Disposable {
        @NonNull
        public Disposable schedule(@NonNull Runnable run) {
            ...
        }
        @NonNull
        public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
        @NonNull
        public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
            ...
        }
    }
}

從上面的定義可以看出,Scheduler 本質(zhì)上就是用來調(diào)度 Runnable 的,支持立即、延時和周期形式的調(diào)用,而 Worker 是任務的最小單元的載體。在 RxJava2 內(nèi)部的實現(xiàn)中,通常一個或者多個 Worker 對應一個ScheduledThreadPoolExecutor對象,這些暫且不表。

scheduleDirect / schedulePeriodicallyDirect

在 RxJava 1.x 時代, Scheduler 是沒有scheduleDirect/schedulePeriodicallyDirect的,只能夠先createWorker,再通過 Worker 來調(diào)度任務。這些方法是對 Worker 調(diào)用的簡化,可以認為是創(chuàng)建了一個只能調(diào)度一次任務的 Worker 并立馬調(diào)度了該任務。在Scheduler基類的源碼中,也可以看出默認的實現(xiàn)是直接 createWorker 并創(chuàng)建對應的 Task 的(雖然在部分 Scheduler 覆蓋的實現(xiàn)上并沒有創(chuàng)建 Worker,但是可以認為存在虛擬的 Worker)。

createWorker

一個 Scheduler 可以創(chuàng)建多個 Worker,這兩者是一對多的關系,而 Worker 與 Task 也是一對多的關系。

如下圖所示:

Worke 的存在為了確保兩件事:

  • 同一個 Worker 創(chuàng)建的 Task 都會確保串行,且立即執(zhí)行的任務符合先進先出原則。
  • Worker 綁定了調(diào)用了他的方法的 Runnable,當該 Worker 取消時,基于他的 Task 均被取消

因此當有操作符需要使用 Scheduler 時,可以通過 Worker 來將一系列的 Runnable 統(tǒng)一的調(diào)度和取消,最典型的例子就是observeOn,下面會詳細分析。

Schedulers

RxJava2 默認內(nèi)置了幾種 Scheduler 的實現(xiàn),適用于不同的場景,這些 Scheduler 均在 Schedulers 類中可以直接獲得

方法說明
Schedulers.computation()適用于計算密集型任務
Schedulers.io()適用于 IO 密集型任務
Schedulers.trampoline()在某個調(diào)用 schedule 的線程執(zhí)行
Schedulers.newThread()每個 Worker 對應一個新線程
Schedulers.single()所有 Worker 使用同一個線程執(zhí)行任務
Schedulers.from(Executor)使用 Executor 作為任務執(zhí)行的線程

這里我們挑選兩個最常用的 computation / io 源碼稍作分析。

NewThreadWorker

NewThreadWorker 在 computation / io / newThread 均有涉及,我們先了解一下這個類。

上面筆者有提到過 Worker 與ScheduledThreadPoolExecutor 的關系,而這里的NewThreadWorkerScheduledThreadPoolExecutor便是一對一的關系。在NewThreadWorker構造函數(shù)中會通過工廠方法創(chuàng)建一個corePoolSize 為 1 的ScheduledThreadPoolExecutor對象并持有之。

ScheduledThreadPoolExecutor 從 JDK1.5 開始存在,這個類繼承于 ThreadPoolExecutor,可以支持即使、延時和周期的任務。但是注意在ScheduledThreadPoolExecutor中 maximumPoolSize 參數(shù)是無效的,corePoolSize 表示其最大線程數(shù),且它的隊列是無界的。這里不再細說該類,否則涉及的就太多了。

有了這個類,RxJava2 實現(xiàn) Worker 時便是站在了巨人的肩膀上,線程調(diào)度可以直接使用該類解決,略微麻煩之處就是封一層Disposable的邏輯。

具體細節(jié)讀者可以從源碼一探究竟。

ComputationScheduler

作為計算密集型的 Scheduler,ComputationScheduler的線程數(shù)是與 CPU 核心密切相關的,原因是當線程數(shù)遠遠超過 CPU 核心數(shù)目時,CPU 的時間更多的損耗在了線程的上下文切換,因此比較通用的方式是保持最大線程數(shù)和 CPU 核心數(shù)一致。

最大線程數(shù)目

MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
static int cap(int cpuCount, int paramThreads) {
    return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads;
}

從上面代碼可見MAX_THREADS 大于 0,但是不超過 CPU 核心數(shù),實際數(shù)值也受用戶設置的 System Properties 的影響。

FixedSchedulerPool

顧名思義,FixedSchedulerPool 可以認為是固定數(shù)目的真正的 Worker 的緩存池。

確定了MAX_THREADS后,在ComputationScheduler的構造函數(shù),會創(chuàng)建FixedSchedulerPool對象,FixedSchedulerPool 內(nèi)部會直接創(chuàng)建一個長度為MAX_THREADSPoolWorker數(shù)組。PoolWorker繼承自NewThreadWorker,但是沒有任何額外的代碼。

static final class PoolWorker extends NewThreadWorker {
    PoolWorker(ThreadFactory threadFactory) {
        super(threadFactory);
    }
}

也就是說當FixedSchedulerPool創(chuàng)建時,已經(jīng)有MAX_THREADS個 corePoolSize 為 1 的 ScheduledThreadPoolExecutor隨之創(chuàng)建。

PoolWorker

從使用角度來說,有了FixedSchedulerPool 好像就夠了,我們只需要每次createWorker時從池子里取一個PoolWorker并返回即可。

但是這里忽略了一個要點,每個 Worker 是獨立的,每個 Worker 內(nèi)部的任務是綁定在這個 Worker 中的。如果按照上述的做法,暴露出去PoolWorker,會出現(xiàn) 2 個問題:

  • createWorker 會可能會返回相同的 Worker,導致這個 Worker 被 dispose 后,其內(nèi)部所有的任務會被一并取消,而違背了不同 Worker 之間的任務的獨立性
  • PoolWorker也就是NewThreadWorker 被 dispose 后,其關聯(lián)的ScheduledThreadPoolExecutor被 shutdown,后續(xù)再次獲取該 Worker 也會導致無法創(chuàng)建任務

EventLoopWorker

為了解決上述的問題,我們需要在PoolWorker外再包一層,createWorker每次都會創(chuàng)建一個EventLoopWorker對象。

EventLoopWorker 其實是個代理對象,他會將 Runnable 代理給FixedSchedulerPool中取到的PoolWorker來調(diào)度,并且他會負責管理經(jīng)由他創(chuàng)建的任務,當自身被取消時,會將創(chuàng)建的任務統(tǒng)統(tǒng)取消。

示意圖

IoScheduler

與 ComputationScheduler 恰恰相反,IO 密集型的 Scheduler 線程數(shù)是無上限的。這是因為 IO 設備的速度是遠遠低于 CPU 速度的,在等待 IO 操作時, CPU 往往是閑置的,因此應該創(chuàng)建更多的線程讓 CPU 盡可能的利用。當然并不是說線程越多越好,線程數(shù)目膨脹到一定程度既會影響 CPU 的效率,也會消耗大量的內(nèi)存。在IoScheduler中,每個 Worker 在空置一段時間后就會被清除以控制線程的數(shù)目。

CachedWorkerPool

CachedWorkerPool是一個變長并定期清理的ThreadWorker的緩存池,內(nèi)部通過一個ConcurrentLinkedQueue維護。和PoolWorker類似,ThreadWorker也是繼承自NewThreadWorker

static final class ThreadWorker extends NewThreadWorker {
    private long expirationTime;
    ThreadWorker(ThreadFactory threadFactory) {
        super(threadFactory);
        this.expirationTime = 0L;
    }
    public long getExpirationTime() {
        return expirationTime;
    }
    public void setExpirationTime(long expirationTime) {
        this.expirationTime = expirationTime;
    }
}

僅僅是增加了一個expirationTime字段,用來標識這個ThreadWorker的超時時間。

于此同時,在CachedWorkerPool初始化時會傳入 Worker 的超時時間,目前是寫死的 60 秒。這個超時時間表示ThreadWorker閑置后最大存活時間(實際中不保證 60 秒時被回收)。

EventLoopWorker

IoScheduler中也存在一個EventLoopWorker類,它和ComputationScheduler中的作用也是類似的:

  • 管理自身調(diào)度過的任務
  • 管理ThreadWorker,使其可被回收再次使用

Worker 的管理

  • 創(chuàng)建:在閑置隊列中查找ThreadWorker,如果存在則取出,否則new``一個新的ThreadWorker,最后在外面包一層EventLoopWorker```并返回。
  • 回收:當EventLoopWorker dispose 后,會更新內(nèi)部的ThreadWorker超時時間,并促使CachedWorkerPoolThreadWorker加入閑置隊列
  • 清理:CachedWorkerPool在初始化時啟動定時任務,每隔 60 秒清理隊列中超時的ThreadWorker

這里說個細節(jié),因為CachedWorkerPool是每隔 60 秒清理一次隊列的,因此ThreadWorker的存活時間取決于入隊的時機,如果一直沒有被再次取出,其被實際清理的延遲在 60 - 120 秒之間,有興趣的讀者可以想一想為什么。

示意圖

對比

熟悉線程的讀者朋友們會發(fā)現(xiàn),ComputationSchedulerIoScheduler很像某些參數(shù)下的ThreadPoolExecutor。

ThreadPoolExecutor 參數(shù)ComputationScheduler(n)IoScheduler
corePoolSizen0
maximumPoolSizenInteger.MAX_VALUE
keepAliveTime060
unit-TimeUnit.SECONDS
workQueueLinkedBlockingQueueSynchronousQueue

他們對線程的控制外在的表現(xiàn)很相似。 但是實際的線程執(zhí)行對象不一樣:

  • ThreadPoolExecutor:Thread
  • Scheduler:支持立即、延遲、定時調(diào)度任務的對象,通常為 ScheduledThreadPoolExecutor(coreSize = 1)

這兩者的對比有助于我們更加深刻地理解 Scheduler 設計的內(nèi)在邏輯。

結語

Scheduler 是 RxJava 線程的核心概念,RxJava 基于此屏蔽了 Thread 相關的概念,只與 Scheduler / Worker / Runnable 打交道。

以上就是RxJava2 Scheduler使用實例深入解析的詳細內(nèi)容,更多關于RxJava2 Scheduler使用的資料請關注腳本之家其它相關文章!

相關文章

  • SpringBoot找不到映射文件的處理方式

    SpringBoot找不到映射文件的處理方式

    這篇文章主要介紹了SpringBoot找不到映射文件的處理方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-10-10
  • linux環(huán)境下java程序打包成簡單的hello world輸出jar包示例

    linux環(huán)境下java程序打包成簡單的hello world輸出jar包示例

    這篇文章主要介紹了linux環(huán)境下java程序打包成簡單的hello world輸出jar包,結合簡單hello world輸出程序示例分析了Linux環(huán)境下的java可執(zhí)行jar包文件的生成相關操作技巧,需要的朋友可以參考下
    2019-11-11
  • 詳解 Java中日期數(shù)據(jù)類型的處理之格式轉換的實例

    詳解 Java中日期數(shù)據(jù)類型的處理之格式轉換的實例

    這篇文章主要介紹了詳解 Java中日期數(shù)據(jù)類型的處理之格式轉換的實例的相關資料,日期以及時間格式處理,在Java中時間格式一般會涉及到的數(shù)據(jù)類型包括Calendar類和Date類,需要的朋友可以參考下
    2017-08-08
  • Java通過關閉Socket終止線程

    Java通過關閉Socket終止線程

    這篇文章主要為大家詳細介紹了Java通過關閉Socket終止線程的相關代碼
    2017-04-04
  • Java中進程、協(xié)程與線程的區(qū)別詳解

    Java中進程、協(xié)程與線程的區(qū)別詳解

    這篇文章主要介紹了Java中進程,線程,協(xié)程的概念、區(qū)別以及使用場景的選擇,早期的操作系統(tǒng)每個程序就是一個進程,知道一個程序運行完,才能進行下一個進程,就是"單進程時代",一切的程序只能串行發(fā)生,需要的朋友可以參考下
    2023-08-08
  • java實現(xiàn)簡單五子棋小游戲(1)

    java實現(xiàn)簡單五子棋小游戲(1)

    這篇文章主要為大家詳細介紹了java實現(xiàn)簡單五子棋小游戲的第一部分,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-01-01
  • Java設計模式之淺談外觀模式

    Java設計模式之淺談外觀模式

    這篇文章主要介紹了Java設計模式之外觀模式的相關資料,需要的朋友可以參考下
    2022-09-09
  • MyBatis持久層框架的用法知識小結

    MyBatis持久層框架的用法知識小結

    MyBatis 本是apache的一個開源項目iBatis,接下來通過本文給大家介紹MyBatis持久層框架的用法知識小結,非常不錯,具有參考借鑒價值,感興趣的朋友一起學習吧
    2016-07-07
  • 淺談SpringCloud的微服務架構組件

    淺談SpringCloud的微服務架構組件

    這篇文章主要介紹了淺談SpringCloud的微服務架構組件,Spring Cloud根據(jù)分布式服務協(xié)調(diào)治理的需求成立了許多子項目,每個項目通過特定的組件去實現(xiàn),需要的朋友可以參考下
    2023-04-04
  • C++內(nèi)存管理看這一篇就夠了

    C++內(nèi)存管理看這一篇就夠了

    這篇文章主要介紹了C/C++中的內(nèi)存管理小結,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2021-08-08

最新評論