Java擴(kuò)展庫(kù)RxJava的基本結(jié)構(gòu)與適用場(chǎng)景小結(jié)
基本結(jié)構(gòu)
我們先來(lái)看一段最基本的代碼,分析這段代碼在RxJava中是如何實(shí)現(xiàn)的。
Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onCompleted();
}
};
Subscriber<String> subscriber1 = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
};
Observable.create(onSubscriber1)
.subscribe(subscriber1);
首先我們來(lái)看一下Observable.create的代碼
public final static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
直接就是調(diào)用了Observable的構(gòu)造函數(shù)來(lái)創(chuàng)建一個(gè)新的Observable對(duì)象,這個(gè)對(duì)象我們暫時(shí)標(biāo)記為observable1,以便后面追溯。
同時(shí),會(huì)將我們傳入的OnSubscribe對(duì)象onSubscribe1保存在observable1的onSubscribe屬性中,這個(gè)屬性在后面的上下文中很重要,大家留心一下。
接下來(lái)我們來(lái)看看subscribe方法。
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
subscriber.onStart();
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
}
可以看到,subscribe之后,就直接調(diào)用了observable1.onSubscribe.call方法,也就是我們代碼中的onSubscribe1對(duì)象的call方法
,傳入的參數(shù)就是我們代碼中定義的subscriber1對(duì)象。call方法中所做的事情就是調(diào)用傳入的subscriber1對(duì)象的onNext和onComplete方法。
這樣就實(shí)現(xiàn)了觀察者和被觀察者之間的通訊,是不是很簡(jiǎn)單?
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onCompleted();
}
RxJava使用場(chǎng)景小結(jié)
1.取數(shù)據(jù)先檢查緩存的場(chǎng)景
取數(shù)據(jù),首先檢查內(nèi)存是否有緩存
然后檢查文件緩存中是否有
最后才從網(wǎng)絡(luò)中取
前面任何一個(gè)條件滿足,就不會(huì)執(zhí)行后面的
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (memoryCache != null) {
subscriber.onNext(memoryCache);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String cachePref = rxPreferences.getString("cache").get();
if (!TextUtils.isEmpty(cachePref)) {
subscriber.onNext(cachePref);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> network = Observable.just("network");
//主要就是靠concat operator來(lái)實(shí)現(xiàn)
Observable.concat(memory, disk, network)
.first()
.subscribeOn(Schedulers.newThread())
.subscribe(s -> {
memoryCache = "memory";
System.out.println("--------------subscribe: " + s);
});
2.界面需要等到多個(gè)接口并發(fā)取完數(shù)據(jù),再更新
//拼接兩個(gè)Observable的輸出,不保證順序,按照事件產(chǎn)生的順序發(fā)送給訂閱者
private void testMerge() {
Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());
Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());
Observable.merge(observable1, observable2)
.subscribeOn(Schedulers.newThread())
.subscribe(System.out::println);
}
3.一個(gè)接口的請(qǐng)求依賴另一個(gè)API請(qǐng)求返回的數(shù)據(jù)
舉個(gè)例子,我們經(jīng)常在需要登陸之后,根據(jù)拿到的token去獲取消息列表。
這里用RxJava主要解決嵌套回調(diào)的問(wèn)題,有一個(gè)專有名詞叫Callback hell
NetworkService.getToken("username", "password")
.flatMap(s -> NetworkService.getMessage(s))
.subscribe(s -> {
System.out.println("message: " + s);
});
4.界面按鈕需要防止連續(xù)點(diǎn)擊的情況
RxView.clicks(findViewById(R.id.btn_throttle))
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(aVoid -> {
System.out.println("click");
});
5.響應(yīng)式的界面
比如勾選了某個(gè)checkbox,自動(dòng)更新對(duì)應(yīng)的preference
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);
Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);
CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.checkedChanges(checkBox)
.subscribe(checked.asAction());
6.復(fù)雜的數(shù)據(jù)變換
Observable.just("1", "2", "2", "3", "4", "5")
.map(Integer::parseInt)
.filter(s -> s > 1)
.distinct()
.take(3)
.reduce((integer, integer2) -> integer.intValue() + integer2.intValue())
.subscribe(System.out::println);//9
相關(guān)文章
解決springboot環(huán)境切換失效的問(wèn)題
這篇文章主要介紹了解決springboot環(huán)境切換失效的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09
MyBatis批量插入幾千條數(shù)據(jù)為何慎用foreach
這篇文章主要介紹了MyBatis批量插入幾千條數(shù)據(jù)為何慎用foreach問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-10-10
Spring boot集成spring session實(shí)現(xiàn)session共享的方法
這篇文章主要介紹了Spring boot集成spring session實(shí)現(xiàn)session共享的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-06-06
談?wù)凧ava中Volatile關(guān)鍵字的理解
volatile這個(gè)關(guān)鍵字可能很多朋友都聽(tīng)說(shuō)過(guò),或許也都用過(guò)。在Java 5之前,它是一個(gè)備受爭(zhēng)議的關(guān)鍵字,因?yàn)樵诔绦蛑惺褂盟鶗?huì)導(dǎo)致出人意料的結(jié)果,本文給大家介紹java中volatile關(guān)鍵字,需要的朋友參考下2016-03-03
通過(guò)Spring自定義NamespaceHandler實(shí)現(xiàn)命名空間解析(推薦)
這篇文章主要介紹了通過(guò)Spring自定義NamespaceHandler實(shí)現(xiàn)命名空間解析,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04
IntelliJ IDEA報(bào)錯(cuò)Error:java: Compilation failed: internal java
今天小編就為大家分享一篇關(guān)于IntelliJ IDEA報(bào)錯(cuò)Error:java: Compilation failed: internal java compiler error的解決辦法,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2018-10-10
SpringBoot詳解整合Spring?Cache實(shí)現(xiàn)Redis緩存流程
這篇文章主要介紹了SpringBoot整合Spring?Cache實(shí)現(xiàn)Redis緩存方案,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07

