RxJava中map和flatMap的用法區(qū)別源碼解析
前言:
RxJava中提供了大量的操作符,這大大提高了了我們的開發(fā)效率。其中最基本的兩個變換操作符就是map
和flatMap
。而其他變換操作符的原理基本與map
類似。
- map和flatMap都是接受一個函數(shù)作為參數(shù)(Func1)并返回一個被觀察者
Observable
- Func1的< I,O >I,O模版分別為輸入和輸出值的類型,實現(xiàn)Func1的call方法對I類型進行處理后返回O類型數(shù)據(jù),只是flatMap中執(zhí)行的方法的返回類型為Observable類型
作用
map
對Observable發(fā)射的每一項數(shù)據(jù)應(yīng)用一個函數(shù),執(zhí)行變換操作。對原始的Observable發(fā)射的每一項數(shù)據(jù)應(yīng)用一個你選擇的函數(shù),然后返回一個發(fā)射這些結(jié)果的Observable。
flatMap
將一個發(fā)射數(shù)據(jù)的Observable變換為多個Observables,然后將它們發(fā)射的數(shù)據(jù)合并后放進一個單獨的Observable。操作符使用一個指定的函數(shù)對原始Observable發(fā)射的每一項數(shù)據(jù)執(zhí)行變換操作,這個函數(shù)返回一個本身也發(fā)射數(shù)據(jù)的Observable,然后FlatMap合并這些Observables發(fā)射的數(shù)據(jù),最后將合并后的結(jié)果當做它自己的數(shù)據(jù)序列發(fā)射
使用方法:
通過代碼來看一下兩者的使用用方法:
map
Observable.just(new User("白瑞德")) .map(new Function<User, String>() { @Override public String apply(User user) throws Throwable { return user.getName(); } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Throwable { System.out.println(s); } }); <<<白瑞德
這段代碼接受一個User對象,最后打印出User中的name。
flatMap
假設(shè)存在一個需求,圖書館要打印每個User借走每一本書的名字: User
結(jié)構(gòu)如下:
class User { private String name; private List<String> book; }
我們來看一下map
的實現(xiàn)方法:
Observable.fromIterable(userList) .map(new Function<User, List<String>>() { @Override public List<String> apply(User user) throws Throwable { return user.getBook(); } }) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) throws Throwable { for (String s : strings) { System.out.println(s); } } });
可以看到,map
的轉(zhuǎn)換總是一對一,只能單一轉(zhuǎn)換。我們不得不借助循環(huán)進行打印。 下面我們來看一下flatMap
的實現(xiàn)方式:
Observable.fromIterable(userList) .flatMap(new Function<User, ObservableSource<String>>() { @Override public ObservableSource<String> apply(User user) throws Throwable { return Observable.fromIterable(user.getBook()); } }) .subscribe(new Consumer<String>() { @Override public void accept(String o) throws Throwable { System.out.println(o); } });
flatmap
既可以單一轉(zhuǎn)換也可以一對多/多對多轉(zhuǎn)換。flatMap
使用一個指定的函數(shù)對原始Observable
發(fā)射的每一項數(shù)據(jù)之行相應(yīng)的變換操作,這個函數(shù)返回一個本身也發(fā)射數(shù)據(jù)的Observable
,因此可以再內(nèi)部再次進行事件的分發(fā)。然后flatMap
合并這些Observables
發(fā)射的數(shù)據(jù),最后將合并后的結(jié)果當做它自己的數(shù)據(jù)序列發(fā)射。
源碼分析
下面我們就結(jié)合源碼來分析一下這兩個操作符。為了降低代碼閱讀難道,這里只保留核心代碼:
map
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { //接受一個Function實例,并返回一個ObservableMap return new ObservableMap<T, R>(this, mapper); } public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { //調(diào)用用父類構(gòu)造方法,初始化父類中的downstream super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { v = mapper.apply(t); downstream.onNext(v); } } }
這段代碼是去掉map源碼中一些校驗和其它相關(guān)回調(diào)后的精簡代碼。接下來分析一下代碼流程:
- 當在調(diào)用
map
時,map接受一個匿名內(nèi)部類Function
的實例,并返回一個ObservableMap
對象。 ObservableMap
本質(zhì)上是一個Observable
,也是一個被觀察者,其構(gòu)造方法接受最外層的那個被Observable
實例,和Function
實例。ObservableMap
重寫了subscribeActual
方法,在subscribeActual
中使用新建了一個MapObserver
實現(xiàn)了對原始Observable
的觀察。- 原始的
Observable
中的數(shù)據(jù)變會被發(fā)送到MapObserver
的實例中。 MapObserver
構(gòu)造方法接收原始Observable
的觀察者actual
,和Function
的實例mapper
MapObserver
在其onNext
方法中調(diào)用mapper
的apply
方法,獲得該方法的返回值v apply方法就是map實例中:public String apply(User user) throws Throwable { return user.getName(); }
- 調(diào)用
downstream
的onNext方法,并傳入實參v。其中downstream
是MapObserver
父類中定義的變量,在MapObserver
構(gòu)造方法中super(actual);
時初始化,其本身就是傳入的actual
,本質(zhì)上就是最原始的Observable
整個流程可以概括如下: 存在一個原始的ObservableA
和一個觀察者ObserverA
,當原始的被觀察者ObservableA
調(diào)用map
,并傳入一個匿名內(nèi)部類實例化的’function‘,map
新建并返回了一個被觀察者ObservableB
,通過subscribe
讓觀察者ObserverA
對其進行訂閱。并重寫subscribeActual
方法,在其被訂閱時創(chuàng)建一個新的觀察者ObserverB
其接受的,并用ObserverB
對原始的ObservableA
進行訂閱觀察。當原始的ObservableA
發(fā)出事件,調(diào)用ObserverB
的onNext
方法,subscribeActual
接受的觀察者便是最原始的觀察者ObserverA
。ObserverB
變執(zhí)行通過匿名內(nèi)部類實例化的’function‘的apply
方法得到數(shù)據(jù)v
,緊接著調(diào)用原始的ObservableA
的onNext
方法,并傳入實參v
,ObserverA
觀察到事件。 一句話概括:一個原始的被觀察者和觀察者,但是讓原始的觀察者去訂閱一個新的觀察者,當新的被觀察者被訂閱的時候,創(chuàng)建一個新的觀察者去訂閱原始的被觀察者,并在監(jiān)聽的事件之后執(zhí)行指定的操作后再通知原始觀察者。所以這里面一共涉及到兩對觀察者和被觀察者,map方法會創(chuàng)建一對新的觀察者和被觀察者作為原始觀察者和被觀察者通訊的紐帶,并在其中做一些數(shù)據(jù)變換。
用圖片顯示流程如下:
藍色框內(nèi)就是map創(chuàng)建的觀察者和被觀察者。實際上我們的原始ObserverA并沒有對ObservableA進行訂閱。
flatMap
faltMap
和map
的基本原理類似,其代碼如下:
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) { return new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize); } public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends ObservableSource<? extends U>> mapper; final boolean delayErrors; final int maxConcurrency; final int bufferSize; public ObservableFlatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { super(source); } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize)); } static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> { MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { ... this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY); } @Override public void onSubscribe(Disposable d) { downstream.onSubscribe(this); } @Override public void onNext(T t) { ObservableSource<? extends U> p; p = mapper.apply(t); subscribeInner(p); } @SuppressWarnings("unchecked") void subscribeInner(ObservableSource<? extends U> p) { InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++); p.subscribe(inner); } void drain() { drainLoop(); } void drainLoop() { final Observer<? super U> child = this.downstream; child.onNext(o); } } static final class InnerObserver<T, U> extends AtomicReference<Disposable> implements Observer<U> { private static final long serialVersionUID = -4606175640614850599L; final long id; final MergeObserver<T, U> parent; volatile boolean done; volatile SimpleQueue<U> queue; int fusionMode; InnerObserver(MergeObserver<T, U> parent, long id) { this.id = id; this.parent = parent; } @Override public void onNext(U t) { parent.drain(); } } }
上述代碼即是faltMap
精簡后的源碼,其中大部分代碼的運作流程和前文中的map
源碼一致,我們繼續(xù)延續(xù)上文中講解中的觀察者和被觀察者。重點關(guān)注其不同的地方: faltMap
返回一個新的被觀察者ObservableB
,重寫ObservableB
的subscribeActual
方法在原始的觀察者ObserverA
對其進行訂閱時新建一個觀察者ObserverB
對原始的ObservableA
進行訂閱。新的觀察者ObserverB
持有原始的ObserverA
和faltMap
接收的匿名對象實例function
。當ObserverB
監(jiān)聽到原始的被觀察者ObservableA
的事件時,ObserverB
調(diào)用function
的apply
方法獲得新新的被觀察者ObservableC
,再創(chuàng)建一個新的觀察者ObserverC
對ObservableC
進行訂閱,ObserverC
持有原始的觀察者ObserverA
,在ObserverC
觀察到被觀察者ObservableC
的時間時,調(diào)用原始的觀察者ObserverA
的方法。
概括就是:faltMap方法要求調(diào)用者提供一個Observable,最原始的Observable在調(diào)用faltMap后,faltMap會創(chuàng)建一個新的Observable,并對原始的進行訂閱。當拿到訂閱后,會通過flatMap接收的函數(shù)拿到調(diào)用者傳入的Observable,并用最原始的觀察者對它進行訂閱。這期間涉及三對觀察者和被觀察者,flatMap會創(chuàng)建一對,同時也接收一對用戶創(chuàng)建的。flatMap創(chuàng)建的和Map中的作用一樣,不過flatMap連接的是原始的和用戶通過flatMap提供的兩對觀察者和被觀察者。而原始的觀察者最終是對用戶通過flatMap提供的那個觀察者進行訂閱。
用圖片顯示流程如下:
和Map的流程很相似,只不過是需要用戶再提供一對觀察者和被觀察者。最終實現(xiàn)對用戶提供的被觀察者進行訂閱。
結(jié)語
至此,map和flatMap已基本分析完畢,其中map的代碼比較簡單易懂,flatMap中還涉及到大量輔助操作,文中并未涉及到其中的合并等操作,閱讀起來有些困難。如果僅僅是為了了解二者的原理,可以閱讀Single<T>
中的代碼。其中的代碼量遠遠少于Observable
中的代碼量。如果對RxJava基本的模式還不了解,可以閱讀 手寫極簡版的Rxjava
以上就是RxJava中map和flatMap的用法區(qū)別源碼解析的詳細內(nèi)容,更多關(guān)于RxJava map flatMap區(qū)別的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
在controller中如何設(shè)置接收參數(shù)的默認值
這篇文章主要介紹了在controller中如何設(shè)置接收參數(shù)的默認值,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03關(guān)于Assert.assertEquals報錯的問題及解決
這篇文章主要介紹了關(guān)于Assert.assertEquals報錯的問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-05-05Java 中實現(xiàn)隨機無重復(fù)數(shù)字的方法
為了更好地理解這個題意,我們先來看下具體內(nèi)容:生成一個1-100 的隨機數(shù)組,但數(shù)組中的數(shù)字不能重復(fù),即位置是隨機的,但數(shù)組元素不能重復(fù)2013-03-03Java的JDBC中Statement與CallableStatement對象實例
這篇文章主要介紹了Java的JDBC中Statement與CallableStatement對象實例,JDBC是Java編程中用于操作數(shù)據(jù)庫的API,需要的朋友可以參考下2015-12-12SpringBoot實現(xiàn)動態(tài)增刪啟停定時任務(wù)的方式
在spring?boot中,可以通過@EnableScheduling注解和@Scheduled注解實現(xiàn)定時任務(wù),也可以通過SchedulingConfigurer接口來實現(xiàn)定時任務(wù),但是這兩種方式不能動態(tài)添加、刪除、啟動、停止任務(wù),本文給大家介紹SpringBoot實現(xiàn)動態(tài)增刪啟停定時任務(wù)的方式,感興趣的朋友一起看看吧2024-03-03關(guān)于postman傳參的幾種格式 list,map 等
這篇文章主要介紹了postman傳參的幾種格式 list,map等,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08