欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RxJava實戰(zhàn)之訂閱流基本原理示例解析

 更新時間:2022年12月30日 16:21:38   作者:itbird01  
這篇文章主要為大家介紹了RxJava實戰(zhàn)之訂閱流基本原理示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

正文

本節(jié),我們從Rxjava使用代碼入手,去結合自己已有的知識體系,加查閱部分源碼驗證的方式,來一起探索一下Rxjava實現(xiàn)的基本原理。

為了本文原理分析環(huán)節(jié),可以被更多的人理解、學習,所以小編從初學者的角度,從使用入手,一點點的分析了其中的源碼細節(jié)、思想,建議大家隨著本文的章節(jié)步驟,一步一步的來閱讀,才能更快、更好的理解Rxjava的真正的思想精髓,也為我們之后的實踐課程留一個好的底子。

訂閱流

有人會問,小編,你到現(xiàn)在為止,只是講了流程,而沒有講到具體每個中間操作符,在轉換的對象里面的方法調用,這個問題,問的特別好!?。?/p>

還記得小編開篇說的那句話嗎?我們從Rxjava的使用代碼入手

private void test() {
	//第一步:just調用
    Observable.just("https://img-blog.csdn.net/20160903083319668")
    //第二步:map調用
            .map(new Function<String, Bitmap>() {
                @Override
                public Bitmap apply(String s) throws Exception {
                    //Bitmap bitmap = downloadImage(s);
                    return null;
                }
            })
            //第三步:subscribeOn、observeOn調用
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            //第四步:subscribe調用
            .subscribe(new Observer<Bitmap>() {
                @Override
                public void onSubscribe() {
                    Log.d(TAG, "onSubscribe");
                }
                @Override
                public void onNext(Bitmap s) {
                    Log.d(TAG, "onNext s = " + s);
                }
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError ", e);
                }
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}

從上面的樣例代碼分析、分解,我們明面上看到四個步驟,暫且列下來:

  • 第一步:just調用
  • 第二步:map調用
  • 第三步:subscribeOn、observeOn調用
  • 第四步:subscribe調用

之所以我們沒有講到ObservableObserveOn、ObservableMap、ObservableJust等對象里面的具體方法調用,是因為到目前為止,從使用例子入手,根本就沒有調用到,所以我們也就無從分析到。,本節(jié),接下來我們分析subscribe調用,大家就發(fā)現(xiàn),里面的某些方法開始調用上了。

subscribe的解讀收下

我們知道,上面的just、map、subscribeOn、observeOn一系列調用下來,依然是一個Observable對象、

Observable是被觀察者的意思,subscribe是訂閱的意思,Observer是觀察者的意思。

大家發(fā)現(xiàn)了沒有?這里有個問題,這家伙和我們標準的觀察者有很大的不同,標準觀察者模式,是一種一對多的行為型設計模式,其實就是若干個觀察者,將自身的接口引用注冊到被觀察者內部,被觀察者狀態(tài)發(fā)生變更時,遍歷內部的list列表,一一通知觀察者,如下圖

Observable.just("https://img-blog.csdn.net/20160903083319668")
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.d(TAG, "onSubscribe");
            }
            @Override
            public void onNext(@NonNull String s) {
                Log.d(TAG, "onNext s = " + s);
            }
            @Override
            public void onError(@NonNull Throwable e) {
                Log.d(TAG, "onError");
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

可是我們看上面Rxjava的代碼,與標準觀察者模式有兩點不同

  • 一是被觀察者訂閱了觀察者
  • 二是從使用上看觀察者和被觀察者的訂閱關系是一對一的

上面提出的兩點不同,我們一邊看源碼,一邊試著去理解一下。

  • 一對一的通知:因為響應式編程思想的重點在于,一個變化,另外一個要能感知到,那么通過這樣變形的觀察者模式,去實現(xiàn)一對一的通知,我覺得也沒啥問題。
  • 被觀察者訂閱觀察者:這個從理論上講,就沒辦法去理解了,對吧,因為你再怎么變形標準觀察者模式,那也肯定是觀察者訂閱被觀察者,所以這里我們有必要簡單通過源碼去了解一下

我們從上面看到just是將傳入的T,再次封裝為了一個ObservableJust對象

@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item) {
    ObjectHelper.requireNonNull(item, "item is null");
    return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

我們看一下ObservableJust類代碼

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }
    @Override
    public T call() {
        return value;
    }
}

從上面看到,ObservableJust僅僅是將傳入的T封裝了一層而已,它繼承與Observable抽象類,而Observable抽象類實現(xiàn)了ObservableSource接口

public abstract class Observable<T> implements ObservableSource<T> {

而ObservableSource接口,就是我們外界調用的subscribe訂閱方法的源頭

public interface ObservableSource<T> {
    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull Observer<? super T> observer);
}

所以Observable肯定實現(xiàn)了subscribe方法,我們看一下Observable的subscribe方法干什么了

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        //對象封裝,暫時不是重點,我們跳過
        observer = RxJavaPlugins.onSubscribe(this, observer);
        //判空
        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

大家看到這里,其實關鍵在于,最終調用了一個subscribeActual方法,而這個方法是個啥?在哪里實現(xiàn)的?一看,這玩意原來是Observable類中的一個抽象方法

protected abstract void subscribeActual(Observer&lt;? super T&gt; observer);

所以這里繞回到開頭,我們知道just,實際上是將傳入的參數(shù)T,轉換封裝為了ObservableJust對象,而ObservableJust繼承與 Observable,所以subscribeActual方法它肯定去了

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
        //最終這里還是調用了觀察者的相應方法
        observer.onSubscribe(sd);
        sd.run();
    }
    @Override
    public T call() {
        return value;
    }
}

小結

大家,發(fā)現(xiàn)了沒有,這里繞了一圈,最終調用通過Observable的抽象方法subscribeActual的巧妙實現(xiàn),最終還是觀察者訂閱了被觀察者,被觀察者內部最終調用了觀察者的具體方法。

這里和標準觀察者模式不同的是,被觀察者立馬去通知了觀察者,說直接點,在調用被觀察者的訂閱方法時,其實就是直接調用了觀察者相應的方法,只不過這里通過模板方法模式,巧妙的封裝了,好了,Rxjava的觀察者模式源碼,我們簡單理解到這里,我們試著自己去編寫實現(xiàn)一下。

也就是訂閱流的過程中,是以執(zhí)行subscribe方法為開始,從右往左執(zhí)行,這個執(zhí)行過程中,每個節(jié)點,做兩件事情

  • 對后面的observer節(jié)點,做一層包裝代理,變?yōu)榇淼膐bserverProxy

由于構建流的執(zhí)行,每個節(jié)點實際上擁有上一個節(jié)點observable對象的引用,所以執(zhí)行 source.subscribe(observerProxy)

訂閱流講到現(xiàn)在,大家是否理解了?當然這里沒有詳細講解其中ObservableSubscribeOn、ObservableObserveOn中的訂閱,如何進行的線程切換,這個并非是不去講,還是那句老話,飯要一點一點的吃,我們congoing使用方法入手,想要去了解的是Rxjava的整體框架原理。至于線程切換如何實現(xiàn)的?這個留個念想,大家可以認真想一下,不建議大家直接去看源碼。我們在Rxjava實踐環(huán)節(jié),也會帶大家一點一點的去實現(xiàn)這個核心功能。

以上就是RxJava實戰(zhàn)之訂閱流基本原理示例解析的詳細內容,更多關于RxJava訂閱流基本原理的資料請關注腳本之家其它相關文章!

相關文章

最新評論