Java擴展庫RxJava的基本結(jié)構(gòu)與適用場景小結(jié)
基本結(jié)構(gòu)
我們先來看一段最基本的代碼,分析這段代碼在RxJava中是如何實現(xiàn)的。
Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("1"); subscriber.onCompleted(); } }; Subscriber<String> subscriber1 = new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { } }; Observable.create(onSubscriber1) .subscribe(subscriber1);
首先我們來看一下Observable.create的代碼
public final static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(hook.onCreate(f)); } protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; }
直接就是調(diào)用了Observable的構(gòu)造函數(shù)來創(chuàng)建一個新的Observable對象,這個對象我們暫時標記為observable1,以便后面追溯。
同時,會將我們傳入的OnSubscribe對象onSubscribe1保存在observable1的onSubscribe屬性中,這個屬性在后面的上下文中很重要,大家留心一下。
接下來我們來看看subscribe方法。
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); } private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { ... subscriber.onStart(); hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); }
可以看到,subscribe之后,就直接調(diào)用了observable1.onSubscribe.call方法,也就是我們代碼中的onSubscribe1對象的call方法
,傳入的參數(shù)就是我們代碼中定義的subscriber1對象。call方法中所做的事情就是調(diào)用傳入的subscriber1對象的onNext和onComplete方法。
這樣就實現(xiàn)了觀察者和被觀察者之間的通訊,是不是很簡單?
public void call(Subscriber<? super String> subscriber) { subscriber.onNext("1"); subscriber.onCompleted(); }
RxJava使用場景小結(jié)
1.取數(shù)據(jù)先檢查緩存的場景
取數(shù)據(jù),首先檢查內(nèi)存是否有緩存
然后檢查文件緩存中是否有
最后才從網(wǎng)絡(luò)中取
前面任何一個條件滿足,就不會執(zhí)行后面的
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { if (memoryCache != null) { subscriber.onNext(memoryCache); } else { subscriber.onCompleted(); } } }); Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { String cachePref = rxPreferences.getString("cache").get(); if (!TextUtils.isEmpty(cachePref)) { subscriber.onNext(cachePref); } else { subscriber.onCompleted(); } } }); Observable<String> network = Observable.just("network"); //主要就是靠concat operator來實現(xiàn) Observable.concat(memory, disk, network) .first() .subscribeOn(Schedulers.newThread()) .subscribe(s -> { memoryCache = "memory"; System.out.println("--------------subscribe: " + s); });
2.界面需要等到多個接口并發(fā)取完數(shù)據(jù),再更新
//拼接兩個Observable的輸出,不保證順序,按照事件產(chǎn)生的順序發(fā)送給訂閱者 private void testMerge() { Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread()); Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread()); Observable.merge(observable1, observable2) .subscribeOn(Schedulers.newThread()) .subscribe(System.out::println); }
3.一個接口的請求依賴另一個API請求返回的數(shù)據(jù)
舉個例子,我們經(jīng)常在需要登陸之后,根據(jù)拿到的token去獲取消息列表。
這里用RxJava主要解決嵌套回調(diào)的問題,有一個專有名詞叫Callback hell
NetworkService.getToken("username", "password") .flatMap(s -> NetworkService.getMessage(s)) .subscribe(s -> { System.out.println("message: " + s); });
4.界面按鈕需要防止連續(xù)點擊的情況
RxView.clicks(findViewById(R.id.btn_throttle)) .throttleFirst(1, TimeUnit.SECONDS) .subscribe(aVoid -> { System.out.println("click"); });
5.響應(yīng)式的界面
比如勾選了某個checkbox,自動更新對應(yīng)的preference
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this); RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences); Preference<Boolean> checked = rxPreferences.getBoolean("checked", true); CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test); RxCompoundButton.checkedChanges(checkBox) .subscribe(checked.asAction());
6.復(fù)雜的數(shù)據(jù)變換
Observable.just("1", "2", "2", "3", "4", "5") .map(Integer::parseInt) .filter(s -> s > 1) .distinct() .take(3) .reduce((integer, integer2) -> integer.intValue() + integer2.intValue()) .subscribe(System.out::println);//9
相關(guān)文章
MyBatis批量插入幾千條數(shù)據(jù)為何慎用foreach
這篇文章主要介紹了MyBatis批量插入幾千條數(shù)據(jù)為何慎用foreach問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-10-10Spring boot集成spring session實現(xiàn)session共享的方法
這篇文章主要介紹了Spring boot集成spring session實現(xiàn)session共享的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-06-06談?wù)凧ava中Volatile關(guān)鍵字的理解
volatile這個關(guān)鍵字可能很多朋友都聽說過,或許也都用過。在Java 5之前,它是一個備受爭議的關(guān)鍵字,因為在程序中使用它往往會導(dǎo)致出人意料的結(jié)果,本文給大家介紹java中volatile關(guān)鍵字,需要的朋友參考下2016-03-03通過Spring自定義NamespaceHandler實現(xiàn)命名空間解析(推薦)
這篇文章主要介紹了通過Spring自定義NamespaceHandler實現(xiàn)命名空間解析,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-04-04IntelliJ IDEA報錯Error:java: Compilation failed: internal java
今天小編就為大家分享一篇關(guān)于IntelliJ IDEA報錯Error:java: Compilation failed: internal java compiler error的解決辦法,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-10-10SpringBoot詳解整合Spring?Cache實現(xiàn)Redis緩存流程
這篇文章主要介紹了SpringBoot整合Spring?Cache實現(xiàn)Redis緩存方案,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07