RxJava中map和flatMap的用法區(qū)別源碼解析
前言:
RxJava中提供了大量的操作符,這大大提高了了我們的開發(fā)效率。其中最基本的兩個(gè)變換操作符就是map
和flatMap
。而其他變換操作符的原理基本與map
類似。
- map和flatMap都是接受一個(gè)函數(shù)作為參數(shù)(Func1)并返回一個(gè)被觀察者
Observable
- Func1的< I,O >I,O模版分別為輸入和輸出值的類型,實(shí)現(xiàn)Func1的call方法對(duì)I類型進(jìn)行處理后返回O類型數(shù)據(jù),只是flatMap中執(zhí)行的方法的返回類型為Observable類型
作用
map
對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),執(zhí)行變換操作。對(duì)原始的Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)你選擇的函數(shù),然后返回一個(gè)發(fā)射這些結(jié)果的Observable。
flatMap
將一個(gè)發(fā)射數(shù)據(jù)的Observable變換為多個(gè)Observables,然后將它們發(fā)射的數(shù)據(jù)合并后放進(jìn)一個(gè)單獨(dú)的Observable。操作符使用一個(gè)指定的函數(shù)對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)執(zhí)行變換操作,這個(gè)函數(shù)返回一個(gè)本身也發(fā)射數(shù)據(jù)的Observable,然后FlatMap合并這些Observables發(fā)射的數(shù)據(jù),最后將合并后的結(jié)果當(dāng)做它自己的數(shù)據(jù)序列發(fā)射
使用方法:
通過代碼來看一下兩者的使用用方法:
map
Observable.just(new User("白瑞德")) .map(new Function<User, String>() { @Override public String apply(User user) throws Throwable { return user.getName(); } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Throwable { System.out.println(s); } }); <<<白瑞德
這段代碼接受一個(gè)User對(duì)象,最后打印出User中的name。
flatMap
假設(shè)存在一個(gè)需求,圖書館要打印每個(gè)User借走每一本書的名字: User
結(jié)構(gòu)如下:
class User { private String name; private List<String> book; }
我們來看一下map
的實(shí)現(xiàn)方法:
Observable.fromIterable(userList) .map(new Function<User, List<String>>() { @Override public List<String> apply(User user) throws Throwable { return user.getBook(); } }) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) throws Throwable { for (String s : strings) { System.out.println(s); } } });
可以看到,map
的轉(zhuǎn)換總是一對(duì)一,只能單一轉(zhuǎn)換。我們不得不借助循環(huán)進(jìn)行打印。 下面我們來看一下flatMap
的實(shí)現(xiàn)方式:
Observable.fromIterable(userList) .flatMap(new Function<User, ObservableSource<String>>() { @Override public ObservableSource<String> apply(User user) throws Throwable { return Observable.fromIterable(user.getBook()); } }) .subscribe(new Consumer<String>() { @Override public void accept(String o) throws Throwable { System.out.println(o); } });
flatmap
既可以單一轉(zhuǎn)換也可以一對(duì)多/多對(duì)多轉(zhuǎn)換。flatMap
使用一個(gè)指定的函數(shù)對(duì)原始Observable
發(fā)射的每一項(xiàng)數(shù)據(jù)之行相應(yīng)的變換操作,這個(gè)函數(shù)返回一個(gè)本身也發(fā)射數(shù)據(jù)的Observable
,因此可以再內(nèi)部再次進(jìn)行事件的分發(fā)。然后flatMap
合并這些Observables
發(fā)射的數(shù)據(jù),最后將合并后的結(jié)果當(dāng)做它自己的數(shù)據(jù)序列發(fā)射。
源碼分析
下面我們就結(jié)合源碼來分析一下這兩個(gè)操作符。為了降低代碼閱讀難道,這里只保留核心代碼:
map
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { //接受一個(gè)Function實(shí)例,并返回一個(gè)ObservableMap return new ObservableMap<T, R>(this, mapper); } public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { //調(diào)用用父類構(gòu)造方法,初始化父類中的downstream super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { v = mapper.apply(t); downstream.onNext(v); } } }
這段代碼是去掉map源碼中一些校驗(yàn)和其它相關(guān)回調(diào)后的精簡(jiǎn)代碼。接下來分析一下代碼流程:
- 當(dāng)在調(diào)用
map
時(shí),map接受一個(gè)匿名內(nèi)部類Function
的實(shí)例,并返回一個(gè)ObservableMap
對(duì)象。 ObservableMap
本質(zhì)上是一個(gè)Observable
,也是一個(gè)被觀察者,其構(gòu)造方法接受最外層的那個(gè)被Observable
實(shí)例,和Function
實(shí)例。ObservableMap
重寫了subscribeActual
方法,在subscribeActual
中使用新建了一個(gè)MapObserver
實(shí)現(xiàn)了對(duì)原始Observable
的觀察。- 原始的
Observable
中的數(shù)據(jù)變會(huì)被發(fā)送到MapObserver
的實(shí)例中。 MapObserver
構(gòu)造方法接收原始Observable
的觀察者actual
,和Function
的實(shí)例mapper
MapObserver
在其onNext
方法中調(diào)用mapper
的apply
方法,獲得該方法的返回值v apply方法就是map實(shí)例中:public String apply(User user) throws Throwable { return user.getName(); }
- 調(diào)用
downstream
的onNext方法,并傳入實(shí)參v。其中downstream
是MapObserver
父類中定義的變量,在MapObserver
構(gòu)造方法中super(actual);
時(shí)初始化,其本身就是傳入的actual
,本質(zhì)上就是最原始的Observable
整個(gè)流程可以概括如下: 存在一個(gè)原始的ObservableA
和一個(gè)觀察者ObserverA
,當(dāng)原始的被觀察者ObservableA
調(diào)用map
,并傳入一個(gè)匿名內(nèi)部類實(shí)例化的’function‘,map
新建并返回了一個(gè)被觀察者ObservableB
,通過subscribe
讓觀察者ObserverA
對(duì)其進(jìn)行訂閱。并重寫subscribeActual
方法,在其被訂閱時(shí)創(chuàng)建一個(gè)新的觀察者ObserverB
其接受的,并用ObserverB
對(duì)原始的ObservableA
進(jìn)行訂閱觀察。當(dāng)原始的ObservableA
發(fā)出事件,調(diào)用ObserverB
的onNext
方法,subscribeActual
接受的觀察者便是最原始的觀察者ObserverA
。ObserverB
變執(zhí)行通過匿名內(nèi)部類實(shí)例化的’function‘的apply
方法得到數(shù)據(jù)v
,緊接著調(diào)用原始的ObservableA
的onNext
方法,并傳入實(shí)參v
,ObserverA
觀察到事件。 一句話概括:一個(gè)原始的被觀察者和觀察者,但是讓原始的觀察者去訂閱一個(gè)新的觀察者,當(dāng)新的被觀察者被訂閱的時(shí)候,創(chuàng)建一個(gè)新的觀察者去訂閱原始的被觀察者,并在監(jiān)聽的事件之后執(zhí)行指定的操作后再通知原始觀察者。所以這里面一共涉及到兩對(duì)觀察者和被觀察者,map方法會(huì)創(chuàng)建一對(duì)新的觀察者和被觀察者作為原始觀察者和被觀察者通訊的紐帶,并在其中做一些數(shù)據(jù)變換。
用圖片顯示流程如下:
藍(lán)色框內(nèi)就是map創(chuàng)建的觀察者和被觀察者。實(shí)際上我們的原始ObserverA并沒有對(duì)ObservableA進(jìn)行訂閱。
flatMap
faltMap
和map
的基本原理類似,其代碼如下:
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) { return new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize); } public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends ObservableSource<? extends U>> mapper; final boolean delayErrors; final int maxConcurrency; final int bufferSize; public ObservableFlatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { super(source); } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize)); } static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> { MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { ... this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY); } @Override public void onSubscribe(Disposable d) { downstream.onSubscribe(this); } @Override public void onNext(T t) { ObservableSource<? extends U> p; p = mapper.apply(t); subscribeInner(p); } @SuppressWarnings("unchecked") void subscribeInner(ObservableSource<? extends U> p) { InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++); p.subscribe(inner); } void drain() { drainLoop(); } void drainLoop() { final Observer<? super U> child = this.downstream; child.onNext(o); } } static final class InnerObserver<T, U> extends AtomicReference<Disposable> implements Observer<U> { private static final long serialVersionUID = -4606175640614850599L; final long id; final MergeObserver<T, U> parent; volatile boolean done; volatile SimpleQueue<U> queue; int fusionMode; InnerObserver(MergeObserver<T, U> parent, long id) { this.id = id; this.parent = parent; } @Override public void onNext(U t) { parent.drain(); } } }
上述代碼即是faltMap
精簡(jiǎn)后的源碼,其中大部分代碼的運(yùn)作流程和前文中的map
源碼一致,我們繼續(xù)延續(xù)上文中講解中的觀察者和被觀察者。重點(diǎn)關(guān)注其不同的地方: faltMap
返回一個(gè)新的被觀察者ObservableB
,重寫ObservableB
的subscribeActual
方法在原始的觀察者ObserverA
對(duì)其進(jìn)行訂閱時(shí)新建一個(gè)觀察者ObserverB
對(duì)原始的ObservableA
進(jìn)行訂閱。新的觀察者ObserverB
持有原始的ObserverA
和faltMap
接收的匿名對(duì)象實(shí)例function
。當(dāng)ObserverB
監(jiān)聽到原始的被觀察者ObservableA
的事件時(shí),ObserverB
調(diào)用function
的apply
方法獲得新新的被觀察者ObservableC
,再創(chuàng)建一個(gè)新的觀察者ObserverC
對(duì)ObservableC
進(jìn)行訂閱,ObserverC
持有原始的觀察者ObserverA
,在ObserverC
觀察到被觀察者ObservableC
的時(shí)間時(shí),調(diào)用原始的觀察者ObserverA
的方法。
概括就是:faltMap方法要求調(diào)用者提供一個(gè)Observable,最原始的Observable在調(diào)用faltMap后,faltMap會(huì)創(chuàng)建一個(gè)新的Observable,并對(duì)原始的進(jìn)行訂閱。當(dāng)拿到訂閱后,會(huì)通過flatMap接收的函數(shù)拿到調(diào)用者傳入的Observable,并用最原始的觀察者對(duì)它進(jìn)行訂閱。這期間涉及三對(duì)觀察者和被觀察者,flatMap會(huì)創(chuàng)建一對(duì),同時(shí)也接收一對(duì)用戶創(chuàng)建的。flatMap創(chuàng)建的和Map中的作用一樣,不過flatMap連接的是原始的和用戶通過flatMap提供的兩對(duì)觀察者和被觀察者。而原始的觀察者最終是對(duì)用戶通過flatMap提供的那個(gè)觀察者進(jìn)行訂閱。
用圖片顯示流程如下:
和Map的流程很相似,只不過是需要用戶再提供一對(duì)觀察者和被觀察者。最終實(shí)現(xiàn)對(duì)用戶提供的被觀察者進(jìn)行訂閱。
結(jié)語
至此,map和flatMap已基本分析完畢,其中map的代碼比較簡(jiǎn)單易懂,flatMap中還涉及到大量輔助操作,文中并未涉及到其中的合并等操作,閱讀起來有些困難。如果僅僅是為了了解二者的原理,可以閱讀Single<T>
中的代碼。其中的代碼量遠(yuǎn)遠(yuǎn)少于Observable
中的代碼量。如果對(duì)RxJava基本的模式還不了解,可以閱讀 手寫極簡(jiǎn)版的Rxjava
以上就是RxJava中map和flatMap的用法區(qū)別源碼解析的詳細(xì)內(nèi)容,更多關(guān)于RxJava map flatMap區(qū)別的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
在controller中如何設(shè)置接收參數(shù)的默認(rèn)值
這篇文章主要介紹了在controller中如何設(shè)置接收參數(shù)的默認(rèn)值,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03關(guān)于Assert.assertEquals報(bào)錯(cuò)的問題及解決
這篇文章主要介紹了關(guān)于Assert.assertEquals報(bào)錯(cuò)的問題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-05-05Java 中實(shí)現(xiàn)隨機(jī)無重復(fù)數(shù)字的方法
為了更好地理解這個(gè)題意,我們先來看下具體內(nèi)容:生成一個(gè)1-100 的隨機(jī)數(shù)組,但數(shù)組中的數(shù)字不能重復(fù),即位置是隨機(jī)的,但數(shù)組元素不能重復(fù)2013-03-03Java的JDBC中Statement與CallableStatement對(duì)象實(shí)例
這篇文章主要介紹了Java的JDBC中Statement與CallableStatement對(duì)象實(shí)例,JDBC是Java編程中用于操作數(shù)據(jù)庫的API,需要的朋友可以參考下2015-12-12SpringBoot實(shí)現(xiàn)動(dòng)態(tài)增刪啟停定時(shí)任務(wù)的方式
在spring?boot中,可以通過@EnableScheduling注解和@Scheduled注解實(shí)現(xiàn)定時(shí)任務(wù),也可以通過SchedulingConfigurer接口來實(shí)現(xiàn)定時(shí)任務(wù),但是這兩種方式不能動(dòng)態(tài)添加、刪除、啟動(dòng)、停止任務(wù),本文給大家介紹SpringBoot實(shí)現(xiàn)動(dòng)態(tài)增刪啟停定時(shí)任務(wù)的方式,感興趣的朋友一起看看吧2024-03-03關(guān)于postman傳參的幾種格式 list,map 等
這篇文章主要介紹了postman傳參的幾種格式 list,map等,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08