RxJava中多種場(chǎng)景的實(shí)現(xiàn)總結(jié)
一、推遲執(zhí)行動(dòng)作
可以使用timer+map方法實(shí)現(xiàn).代碼如下:
Observable.timer(5, TimeUnit.MILLISECONDS).map(value->{ return doSomething(); }).subscribe(System.out::println); }
二、推遲發(fā)送執(zhí)行的結(jié)果
這種場(chǎng)景要求產(chǎn)生數(shù)據(jù)的動(dòng)作是馬上執(zhí)行,但是結(jié)果推遲發(fā)送.這和上面場(chǎng)景的是不一樣的.
這種場(chǎng)景可以使用Observable.zip
來(lái)實(shí)現(xiàn).
zip操作符將多個(gè)Observable發(fā)射的數(shù)據(jù)按順序組合起來(lái),每個(gè)數(shù)據(jù)只能組合一次,而且都是有序的。最終組合的數(shù)據(jù)的數(shù)量由發(fā)射數(shù)據(jù)最少的Observable來(lái)決定。
對(duì)于各個(gè)observable相同位置的數(shù)據(jù),需要相互等待,也就說(shuō),第一個(gè)observable第一個(gè)位置的數(shù)據(jù)產(chǎn)生后,要等待第二個(gè)observable第一個(gè)位置的數(shù)據(jù)產(chǎn)生,等各個(gè)Observable相同位置的數(shù)據(jù)都產(chǎn)生后,才能按指定規(guī)則進(jìn)行組合.這真是我們要利用的.
zip有很多種聲明,但大致上是一樣的,就是傳入幾個(gè)observable,然后指定一個(gè)規(guī)則,對(duì)每個(gè)observable對(duì)應(yīng)位置的數(shù)據(jù)進(jìn)行處理,產(chǎn)生一個(gè)新的數(shù)據(jù), 下面是其中一個(gè)最簡(jiǎn)單的:
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction);
用zip實(shí)現(xiàn)推送發(fā)送執(zhí)行結(jié)果如下:
Observable.zip(Observable.timer(5,TimeUnit.MILLISECONDS) ,Observable.just(doSomething()), (x,y)->y) .subscribe(System.out::println));
三、使用defer在指定線(xiàn)程里執(zhí)行某種動(dòng)作
如下面的代碼,雖然我們指定了線(xiàn)程的運(yùn)行方式,但是doSomething()
這個(gè)函數(shù)還是在當(dāng)前代碼調(diào)用的線(xiàn)程中執(zhí)行的.
Observable.just(doSomething()) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(v->Utils.printlnWithThread(v.toString()););
通常我們采用下面的方法達(dá)到目的:
Observable.create(s->{s.onNext(doSomething());}) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(v->{ Utils.printlnWithThread(v.toString()); });
但其實(shí)我們采用defer也能達(dá)到相同的目的.
關(guān)于defer
defer 操作符與create、just、from等操作符一樣,是創(chuàng)建類(lèi)操作符,不過(guò)所有與該操作符相關(guān)的數(shù)據(jù)都是在訂閱是才生效的。
聲明:
public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory);
defer的Func0里的Observable是在訂閱(subscribe)的時(shí)候才創(chuàng)建的.
作用:
Do not create the Observable until an Observer subscribes; create a fresh Observable on each subscription.
也就說(shuō)observable是在訂閱的時(shí)候才創(chuàng)建的.
上面的問(wèn)題用defer實(shí)現(xiàn):
Observable.defer(()->Observable.just(doSomething())) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(v->{Utils.printlnWithThread(v.toString()); });
四、使用compose不要打斷鏈?zhǔn)浇Y(jié)構(gòu)
我們經(jīng)??吹较旅娴拇a:
Observable.just(doSomething()) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(v->{Utils.printlnWithThread(v.toString());
上面的代碼中,subscribeOn(xxx).observeOn(xxx)
可能在很多地方都是一樣的, 如果我們打算把它統(tǒng)一在某一個(gè)地方實(shí)現(xiàn), 我們可以這么寫(xiě):
private static <T> Observable<T> applySchedulers(Observable<T> observable) { return observable.subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()); }
但是這樣每次我們需要調(diào)用上面的方法, 大致會(huì)像下面這樣,最外面是一個(gè)函數(shù),等于打破了鏈接結(jié)構(gòu):
applySchedulers(Observable.from(someSource).map(new Func1<Data, Data>() { @Override public Data call(Data data) { return manipulate(data); } }) ).subscribe(new Action1<Data>() { @Override public void call(Data data) { doSomething(data); } });
可以使用compose操作符達(dá)到不打破鏈接結(jié)構(gòu)的目的.
compose的申明如下:
public Observable compose(Transformer<? super T, ? extends R> transformer);
它的入?yún)⑹且粋€(gè)Transformer接口,輸出是一個(gè)Observable. 而Transformer實(shí)際上就是一個(gè)Func1<Observable<T>
, Observable<R>>
,換言之就是:可以通過(guò)它將一種類(lèi)型的Observable轉(zhuǎn)換成另一種類(lèi)型的Observable.
簡(jiǎn)單的說(shuō),compose可以通過(guò)指定的轉(zhuǎn)化方式(輸入?yún)?shù)transformer),將原來(lái)的observable轉(zhuǎn)化為另外一種Observable.
通過(guò)compose, 采用下面方式指定線(xiàn)程方式:
private static <T> Transformer<T, T> applySchedulers() { return new Transformer<T, T>() { @Override public Observable<T> call(Observable<T> observable) { return observable.subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()); } }; } Observable.just(doSomething()).compose(applySchedulers()) .subscribe(v->{Utils.printlnWithThread(v.toString()); });
函數(shù)applySchedulers可以使用lambda表達(dá)式進(jìn)一步簡(jiǎn)化為下面為:
private static <T> Transformer<T, T> applySchedulers() { return observable->observable.subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()); }
五、按優(yōu)先級(jí)使用不同的執(zhí)行結(jié)果
上面這個(gè)標(biāo)題估計(jì)沒(méi)表達(dá)清楚我想表達(dá)的場(chǎng)景. 其實(shí)我想表達(dá)的場(chǎng)景類(lèi)似于平常的獲取網(wǎng)絡(luò)數(shù)據(jù)場(chǎng)景:如果緩存有,從緩存獲取,如果沒(méi)有,再?gòu)木W(wǎng)絡(luò)獲取.
這里要求,如果緩存有,不會(huì)做從網(wǎng)絡(luò)獲取數(shù)據(jù)的動(dòng)作.
這個(gè)可以采用concat+first實(shí)現(xiàn).
concat將幾個(gè)Observable合并成一個(gè)Observable,返回最終的一個(gè)Observable. 而那些數(shù)據(jù)就像從一個(gè)Observable發(fā)出來(lái)一樣. 參數(shù)可以是多個(gè)Observable,也可以是包含Observalbe的Iterator.
新的observable內(nèi)的數(shù)據(jù)排列按原來(lái)concat里的observable順序排列,即新結(jié)果內(nèi)的數(shù)據(jù)是按原來(lái)的順序排序的.
下面是上述需求的實(shí)現(xiàn):
Observable.concat(getDataFromCache(),getDataFromNetwork()).first() .subscribe(v->System.out.println("result:"+v)); //從緩存獲取數(shù)據(jù) private static Observable<String> getDataFromCache(){ return Observable.create(s -> { //dosomething to get data int value = new Random().nextInt(); value = value%2; if (value!=0){ s.onNext("data from cache:"+value); //產(chǎn)生數(shù)據(jù) } //s.onError(new Throwable("none")); s.onCompleted(); } ); } //從網(wǎng)絡(luò)獲取數(shù)據(jù) private static Observable<String> getDataFromNetwork(){ return Observable.create(s -> { for (int i = 0; i < 10; i++) { Utils.println("obs2 generate "+i); s.onNext("data from network:" + i); //產(chǎn)生數(shù)據(jù) } s.onCompleted(); } ); }
上面的實(shí)現(xiàn),如果getDataFromCache有數(shù)據(jù), getDataFromNetwork這里的代碼是不會(huì)執(zhí)行的, 這正是我們想要的.
上面實(shí)現(xiàn)有幾個(gè)需要注意:
1、有可能從兩個(gè)地方都獲取不到數(shù)據(jù), 這種場(chǎng)景下使用first會(huì)拋出異常NoSuchElementException,如果是這樣的場(chǎng)景,需要用firstOrDefault替換上面的first.
2、上面getDataFromCache()
里,如果沒(méi)有數(shù)據(jù),我們直接調(diào)用onCompleted,如果不調(diào)用onCompleted,而是調(diào)用onError,則上述采用concat是得不到任何結(jié)果的.因?yàn)閏oncat在收到任何一個(gè)error,合并就會(huì)停止.所以,如果要用onError, 則需要用concatDelayError替代concat.concatDelayError
會(huì)先忽略error,將error推遲到最后在處理.
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作能帶來(lái)一定的幫助,如果有疑問(wèn)大家可以留言交流。
相關(guān)文章
SpringMVC中@RequestMapping注解的實(shí)現(xiàn)
RequestMapping是一個(gè)用來(lái)處理請(qǐng)求地址映射的注解,本文主要介紹了SpringMVC中@RequestMapping注解的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2024-01-01MyBatis學(xué)習(xí)筆記(二)之關(guān)聯(lián)關(guān)系
這篇文章主要介紹了MyBatis學(xué)習(xí)筆記(二)之關(guān)聯(lián)關(guān)系 的相關(guān)資料,需要的朋友可以參考下2016-02-02Java的MoreSuppliers工具類(lèi)方法解析
這篇文章主要介紹了Java的MoreSuppliers工具類(lèi)方法解析,MoreSuppliers類(lèi)是一個(gè)Java工具類(lèi),它提供了一些增強(qiáng)的Supplier函數(shù),使得Supplier執(zhí)行的結(jié)果可以被緩存,真正的調(diào)用只執(zhí)行一次,需要的朋友可以參考下2024-01-01log4j升級(jí)log4j2遇到的問(wèn)題及解決方式
這篇文章主要介紹了log4j升級(jí)log4j2遇到的問(wèn)題及解決方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12springboot2.x解決運(yùn)行順序及Bean對(duì)象注入順序的問(wèn)題
這篇文章主要介紹了springboot2.x解決運(yùn)行順序及Bean對(duì)象注入順序的問(wèn)題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-01-01Spring自帶的校驗(yàn)框架Validation的使用實(shí)例
今天小編就為大家分享一篇關(guān)于Spring自帶的校驗(yàn)框架Validation的使用實(shí)例,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2019-03-03