欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RxJava2 線程調(diào)度的方法

 更新時(shí)間:2019年03月15日 09:38:25   作者:https://segmentfault.com/a/1190000018505463  
這篇文章主要介紹了RxJava2 線程調(diào)度的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧

subscribeOn和observeOn負(fù)責(zé)線程切換,同時(shí)某些操作符也默認(rèn)指定了線程.

我們這里不分析在線程中怎么執(zhí)行的.只看如何切換到某個(gè)指定線程.

subscribeOn

Observable.subscribeOn()在方法內(nèi)部生成了一個(gè)ObservableSubscribeOn對象.

主要看一下ObservableSubscribeOn的subscribeActual方法.

 @Override
  public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
    //調(diào)用下游的Observer的onSubscribe方法
    observer.onSubscribe(parent);
    //通過SubscribeTask執(zhí)行了上游Observable的subscribeActual方法
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
  }

scheduler.scheduleDirect(Runnable)用于執(zhí)行SubscribeTask這個(gè)任務(wù).SubscribeTask本身是Runnable的實(shí)現(xiàn)類.看一下其run方法.

    @Override
    public void run() {
      //上游的Observable.subscribe方法被切換到了新的線程
      source.subscribe(parent);
    }

首先可以得出結(jié)論:subscribeOn將上游的Observable的subscribe方法切換到了新的線程.

如果多次調(diào)用subscribeOn切換線程,會(huì)有什么效果?

由下往上,每次調(diào)用subscribeOn,都會(huì)導(dǎo)致上游的Observable的subscribeActual切換到指定的線程.那么最后一次調(diào)用的切換最上游的創(chuàng)建型操作符的subscribeActual的執(zhí)行線程.如果操作符有默認(rèn)執(zhí)行線程怎么辦?

操作符默認(rèn)線程

如果是創(chuàng)建型操作符,處于最上游,那么subscribeOn的線程切換對它不起作用.天高皇帝遠(yuǎn),縣官不如現(xiàn)管.就是這個(gè)道理.
如果是其它操作符,會(huì)是怎樣的?

以操作符timeout為例:它對應(yīng)ObservableTimeoutTimed和TimeoutObserver

 @Override
    public void onNext(T t) {
      downstream.onNext(t);
      //超時(shí)計(jì)時(shí)
      startTimeout(idx + 1);
    }

    void startTimeout(long nextIndex) {
      //交給操作符默認(rèn)的線程執(zhí)行
      task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit));
    }

    @Override
    public void onError(Throwable t) {
        downstream.onError(t); 
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
      }
    }

    @Override
    public void onTimeout(long idx) {
        downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));
    }

//TimeoutTask.java
static final class TimeoutTask implements Runnable {

    @Override
    public void run() {
      parent.onTimeout(idx);
    }
  }

可以看到操作符默認(rèn)的執(zhí)行線程只用來做超時(shí)計(jì)時(shí)任務(wù),如果超時(shí)了,會(huì)在操作符的默認(rèn)線程執(zhí)行onError方法..操作符默認(rèn)線程對下游的observer造成什么影響要做具體對待.

observeOn

observeOn對應(yīng)ObservableObserveOnObserveOnObserver.

 //ObservableObserveOn.java
 @Override
  protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
      source.subscribe(observer);
    } else {
      Scheduler.Worker w = scheduler.createWorker();
      source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
  }
 //ObserveOnObserver.java 
  @Override
    public void onSubscribe(Disposable d) {
      if (DisposableHelper.validate(this.upstream, d)) {
        if (d instanceof QueueDisposable) {
          if (m == QueueDisposable.SYNC) {
          //執(zhí)行下游Observer的onSubscribe方法
            downstream.onSubscribe(this);
            schedule();
            return;
          }
          if (m == QueueDisposable.ASYNC) {
           //執(zhí)行下游Observer的onSubscribe方法
            downstream.onSubscribe(this);
            return;
          }
        }
         //執(zhí)行下游Observer的onSubscribe方法
        downstream.onSubscribe(this);
      }
    }
    @Override
    public void onNext(T t) {
     //省略
      schedule();
    }
    @Override
    public void onError(Throwable t) {
     //省略
      schedule();
    }
     void schedule() {
      if (getAndIncrement() == 0) {
      /*
      ObserveOnObserver是Runnable的實(shí)現(xiàn)類.交給線程池執(zhí)行
      */
        worker.schedule(this);
      }
    }
    
    
    void drainNormal() {
      final Observer<? super T> a = downstream;
      for (;;) {
        for (;;) {
          T v;
          try {
            v = q.poll();
          } catch (Throwable ex) {
            a.onError(ex);
            return;
          }
          //執(zhí)行下游Observer的onNext方法
          a.onNext(v);
        }
      }
    }

    void drainFused() {
      for (;;) {
        if (!delayError && d && ex != null) {
          //執(zhí)行下游Observer的onError方法
          downstream.onError(error);
          return;
        }
        downstream.onNext(null);
        if (d) {
          ex = error;
          if (ex != null) {
            //執(zhí)行下游Observer的onError方法
            downstream.onError(ex);
          } else {
            //執(zhí)行下游Observer的onComplete方法
            downstream.onComplete();
          }
          return;
        }
      }
    }
    //執(zhí)行線程任務(wù)
    @Override
    public void run() {
      if (outputFused) {
        drainFused();
      } else {
        drainNormal();
      }
    }

從上面可以看出ObservableObserveOn在其subscribeActual方法中并沒有切換上游Observable的subscribe方法的執(zhí)行線程.但是ObserveOnObserver在其onNext,onError和onComplete中通過schedule()方法將下游Observer的各個(gè)方法切換到了新的線程.

得出結(jié)論: observeOn負(fù)責(zé)切換的是下游Observer的各個(gè)方法的執(zhí)行線程

如果下游多次通過observeOn切換線程,會(huì)有什么效果?

每次切換都會(huì)對其下游造成影響,直到遇到下一個(gè)observeOn為止.

Observer(onSubscribe,onNext,onError,onComplete)

onNext,onError,onComplete與上游最近的observeOn所切換的線程保持一致.onSubscribe則不同.
遇到線程切換的時(shí)候,會(huì)首先在對應(yīng)的Observable的subscribeActual方法內(nèi),先調(diào)用observer.onSubscribe方法.而observer.onSubscribe會(huì)逐級(jí)向上傳遞直到最上游,而最上游的observer.onSubscribe是在subscribeActual方法內(nèi)調(diào)用,這是在主線程執(zhí)行的.所以onSubscribe方法無論如何都是在主線程執(zhí)行.

doOnSubscribe

.doOnSubscribe(new Consumer<Disposable>() {
          @Override
          public void accept(Disposable disposable) throws Exception {
           
          }
        })

我們要看的是方法accept的執(zhí)行線程.

通過源碼找到對應(yīng)的DisposableLambdaObserver.

 @Override
  public void onSubscribe(Disposable d) {
  //在這里調(diào)用了accept方法.
      onSubscribe.accept(d);
  }

這就要看上游在哪個(gè)線程執(zhí)行了Observer.onSubscribe(disposable)方法.

在創(chuàng)建型操作符的subscribeActual方法和subscribeOn對應(yīng)的Observable的subscribeActual方法內(nèi)調(diào)用了Observer.onSubscribe(disposable)方法.那么這兩處的執(zhí)行線程就決定了onSubscribe.accept(d);的執(zhí)行線程.

doFinally

對應(yīng)ObservableDoFinally和DoFinallyObserver

 //DoFinallyObserver.java
 @Override
    public void onError(Throwable t) {
      runFinally();
    }

    @Override
    public void onComplete() {
      runFinally();
    }

    @Override
    public void dispose() {
      runFinally();
    }
    
     void runFinally() {
       onFinally.run();
    }

可以看到與它所對應(yīng)的DoFinallyObserver的onError,onComplete,dispose方法的執(zhí)行線程有關(guān),這三個(gè)方法的執(zhí)行線程又受到上游的observeOn的影響.如果沒有observeOn,則會(huì)受到最上游的observable.subscribeActual方法影響.

doOnError

對應(yīng)ObservableDoOnEach和DoOnEachObserver

//DoOnEachObserver.java
 @Override
    public void onError(Throwable t) {
        onError.accept(t);
    }

和自身對應(yīng)的observer.onError所在線程保持一致.

doOnNext

對應(yīng)ObservableDoOnEach和DoOnEachObserver

//DoOnEachObserver.java
 @Override
    public void onNext(T t) {
        onNext.accept(t);
    }

和自身對應(yīng)的observer.onNext所在線程保持一致.

操作符對應(yīng)方法參數(shù)的執(zhí)行線程

包io.reactivex.functions下的接口類一般用于處理上游數(shù)據(jù)然后往下傳遞.這些接口類的方法一般在對應(yīng)的observer.onNext中調(diào)用.所以他們的線程保持一致.

總結(jié):

subscribeOn由下往上逐級(jí)切換Observable.subscribe的執(zhí)行線程,不受observeOn影響,也不受具有默認(rèn)指定線程的非創(chuàng)建型操作符影響,但是會(huì)被更上游的subscribeOn奪取線程切換的權(quán)利,直到最上游.如果最上游的創(chuàng)建型操作符也有默認(rèn)執(zhí)行線程,那么任何一個(gè)subscribeOn的線程切換不起作用.subscribeOn由下向上到達(dá)最上游后,然后由上往下影響下游的observer的執(zhí)行線程.遇到observeOn會(huì)被奪取線程切換的權(quán)利.observeOn影響的是下游的observer的執(zhí)行線程,由上往下,遇到另一個(gè)observeOn會(huì)移交線程控制權(quán)力,遇到指定默認(rèn)線程非創(chuàng)建型的操作符,要視具體情況對待.

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • Android實(shí)現(xiàn)一鍵鎖屏功能

    Android實(shí)現(xiàn)一鍵鎖屏功能

    這篇文章主要介紹了Android實(shí)現(xiàn)一鍵鎖屏,在xml中創(chuàng)建device_admin.xml,在manifest中加入詳細(xì)文件,本文結(jié)合示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-10-10
  • Android開發(fā)之獲取SD卡及手機(jī)ROM容量的方法

    Android開發(fā)之獲取SD卡及手機(jī)ROM容量的方法

    這篇文章主要介紹了Android開發(fā)之獲取SD卡及手機(jī)ROM容量的方法,結(jié)合實(shí)例形式分析了Android針對SD卡的讀取及屬性操作相關(guān)技巧,需要的朋友可以參考下
    2016-04-04
  • Android開源組件小結(jié)

    Android開源組件小結(jié)

    Android自帶的組件比較丑陋(個(gè)人感覺),自己寫組件比較復(fù)雜,而且必須熟悉android應(yīng)用層開發(fā)的一些機(jī)制,如繪制、回調(diào),所以非迫不得已的情況下還是不要自己寫組件,因?yàn)榕驴紤]不周全導(dǎo)致譬如性能或異常方面的問題,你自己寫也會(huì)耗費(fèi)不少時(shí)間
    2013-02-02
  • Android Volley圖片加載功能詳解

    Android Volley圖片加載功能詳解

    這篇文章主要為大家詳細(xì)介紹了Android Volley圖片加載功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2016-09-09
  • 分享Android中ExpandableListView控件使用教程

    分享Android中ExpandableListView控件使用教程

    這篇文章主要介紹了Android中ExpandableListView控件使用教程,可以實(shí)現(xiàn)二級(jí)列表展示效果,需要的朋友可以參考下
    2015-12-12
  • Android實(shí)現(xiàn)計(jì)步傳感器功能

    Android實(shí)現(xiàn)計(jì)步傳感器功能

    這篇文章主要為大家詳細(xì)介紹了Android實(shí)現(xiàn)計(jì)步傳感器功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-01-01
  • Android字體設(shè)置及Roboto字體使用方法

    Android字體設(shè)置及Roboto字體使用方法

    這篇文章主要介紹了Android字體設(shè)置及Roboto字體使用方法,較為詳細(xì)的分析了自定義字體及RoBoto的具體用法,非常具有實(shí)用價(jià)值,需要的朋友可以參考下
    2015-01-01
  • Android實(shí)現(xiàn)可輸入數(shù)據(jù)的彈出框

    Android實(shí)現(xiàn)可輸入數(shù)據(jù)的彈出框

    這篇文章主要為大家詳細(xì)介紹了Android實(shí)現(xiàn)可輸入數(shù)據(jù)的彈出框,文章提供了兩種方式,示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2016-01-01
  • Android編程之軟鍵盤的隱藏顯示實(shí)例詳解

    Android編程之軟鍵盤的隱藏顯示實(shí)例詳解

    這篇文章主要介紹了Android編程之軟鍵盤的隱藏顯示,結(jié)合實(shí)例形式詳細(xì)分析了Android編程中軟鍵盤的使用技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下
    2015-12-12
  • android指定DatePickerDialog樣式并不顯示年的實(shí)現(xiàn)代碼

    android指定DatePickerDialog樣式并不顯示年的實(shí)現(xiàn)代碼

    下面小編就為大家?guī)硪黄猘ndroid指定DatePickerDialog樣式并不顯示年的實(shí)現(xiàn)代碼。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧,祝大家游戲愉快哦
    2016-08-08

最新評論