Java擴(kuò)展庫(kù)RxJava的基本結(jié)構(gòu)與適用場(chǎng)景小結(jié)
基本結(jié)構(gòu)
我們先來(lái)看一段最基本的代碼,分析這段代碼在RxJava中是如何實(shí)現(xiàn)的。
Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("1"); subscriber.onCompleted(); } }; Subscriber<String> subscriber1 = new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { } }; Observable.create(onSubscriber1) .subscribe(subscriber1);
首先我們來(lái)看一下Observable.create的代碼
public final static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(hook.onCreate(f)); } protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; }
直接就是調(diào)用了Observable的構(gòu)造函數(shù)來(lái)創(chuàng)建一個(gè)新的Observable對(duì)象,這個(gè)對(duì)象我們暫時(shí)標(biāo)記為observable1,以便后面追溯。
同時(shí),會(huì)將我們傳入的OnSubscribe對(duì)象onSubscribe1保存在observable1的onSubscribe屬性中,這個(gè)屬性在后面的上下文中很重要,大家留心一下。
接下來(lái)我們來(lái)看看subscribe方法。
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); } private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { ... subscriber.onStart(); hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); }
可以看到,subscribe之后,就直接調(diào)用了observable1.onSubscribe.call方法,也就是我們代碼中的onSubscribe1對(duì)象的call方法
,傳入的參數(shù)就是我們代碼中定義的subscriber1對(duì)象。call方法中所做的事情就是調(diào)用傳入的subscriber1對(duì)象的onNext和onComplete方法。
這樣就實(shí)現(xiàn)了觀察者和被觀察者之間的通訊,是不是很簡(jiǎn)單?
public void call(Subscriber<? super String> subscriber) { subscriber.onNext("1"); subscriber.onCompleted(); }
RxJava使用場(chǎng)景小結(jié)
1.取數(shù)據(jù)先檢查緩存的場(chǎng)景
取數(shù)據(jù),首先檢查內(nèi)存是否有緩存
然后檢查文件緩存中是否有
最后才從網(wǎng)絡(luò)中取
前面任何一個(gè)條件滿足,就不會(huì)執(zhí)行后面的
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { if (memoryCache != null) { subscriber.onNext(memoryCache); } else { subscriber.onCompleted(); } } }); Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { String cachePref = rxPreferences.getString("cache").get(); if (!TextUtils.isEmpty(cachePref)) { subscriber.onNext(cachePref); } else { subscriber.onCompleted(); } } }); Observable<String> network = Observable.just("network"); //主要就是靠concat operator來(lái)實(shí)現(xiàn) Observable.concat(memory, disk, network) .first() .subscribeOn(Schedulers.newThread()) .subscribe(s -> { memoryCache = "memory"; System.out.println("--------------subscribe: " + s); });
2.界面需要等到多個(gè)接口并發(fā)取完數(shù)據(jù),再更新
//拼接兩個(gè)Observable的輸出,不保證順序,按照事件產(chǎn)生的順序發(fā)送給訂閱者 private void testMerge() { Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread()); Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread()); Observable.merge(observable1, observable2) .subscribeOn(Schedulers.newThread()) .subscribe(System.out::println); }
3.一個(gè)接口的請(qǐng)求依賴另一個(gè)API請(qǐng)求返回的數(shù)據(jù)
舉個(gè)例子,我們經(jīng)常在需要登陸之后,根據(jù)拿到的token去獲取消息列表。
這里用RxJava主要解決嵌套回調(diào)的問(wèn)題,有一個(gè)專有名詞叫Callback hell
NetworkService.getToken("username", "password") .flatMap(s -> NetworkService.getMessage(s)) .subscribe(s -> { System.out.println("message: " + s); });
4.界面按鈕需要防止連續(xù)點(diǎn)擊的情況
RxView.clicks(findViewById(R.id.btn_throttle)) .throttleFirst(1, TimeUnit.SECONDS) .subscribe(aVoid -> { System.out.println("click"); });
5.響應(yīng)式的界面
比如勾選了某個(gè)checkbox,自動(dòng)更新對(duì)應(yīng)的preference
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this); RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences); Preference<Boolean> checked = rxPreferences.getBoolean("checked", true); CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test); RxCompoundButton.checkedChanges(checkBox) .subscribe(checked.asAction());
6.復(fù)雜的數(shù)據(jù)變換
Observable.just("1", "2", "2", "3", "4", "5") .map(Integer::parseInt) .filter(s -> s > 1) .distinct() .take(3) .reduce((integer, integer2) -> integer.intValue() + integer2.intValue()) .subscribe(System.out::println);//9
相關(guān)文章
解決springboot環(huán)境切換失效的問(wèn)題
這篇文章主要介紹了解決springboot環(huán)境切換失效的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09MyBatis批量插入幾千條數(shù)據(jù)為何慎用foreach
這篇文章主要介紹了MyBatis批量插入幾千條數(shù)據(jù)為何慎用foreach問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-10-10Spring boot集成spring session實(shí)現(xiàn)session共享的方法
這篇文章主要介紹了Spring boot集成spring session實(shí)現(xiàn)session共享的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-06-06談?wù)凧ava中Volatile關(guān)鍵字的理解
volatile這個(gè)關(guān)鍵字可能很多朋友都聽(tīng)說(shuō)過(guò),或許也都用過(guò)。在Java 5之前,它是一個(gè)備受爭(zhēng)議的關(guān)鍵字,因?yàn)樵诔绦蛑惺褂盟鶗?huì)導(dǎo)致出人意料的結(jié)果,本文給大家介紹java中volatile關(guān)鍵字,需要的朋友參考下2016-03-03通過(guò)Spring自定義NamespaceHandler實(shí)現(xiàn)命名空間解析(推薦)
這篇文章主要介紹了通過(guò)Spring自定義NamespaceHandler實(shí)現(xiàn)命名空間解析,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04IntelliJ IDEA報(bào)錯(cuò)Error:java: Compilation failed: internal java
今天小編就為大家分享一篇關(guān)于IntelliJ IDEA報(bào)錯(cuò)Error:java: Compilation failed: internal java compiler error的解決辦法,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2018-10-10SpringBoot詳解整合Spring?Cache實(shí)現(xiàn)Redis緩存流程
這篇文章主要介紹了SpringBoot整合Spring?Cache實(shí)現(xiàn)Redis緩存方案,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07