RxJava的消息發(fā)送和線程切換實(shí)現(xiàn)原理
RxJava是一個(gè)在Java虛擬機(jī)上的響應(yīng)式擴(kuò)展,通過(guò)使用可觀察的序列將異步和基于事件的程序組合起來(lái)的一個(gè)庫(kù)。
它擴(kuò)展了觀察者模式來(lái)支持?jǐn)?shù)據(jù)/事件序列,并且添加了操作符,這些操作符允許你聲明性地組合序列,同時(shí)抽象出要關(guān)注的問(wèn)題:比如低級(jí)線程、同步、線程安全和并發(fā)數(shù)據(jù)結(jié)構(gòu)等。
RxJava相信大家都非常了解吧,今天分享一下RxJava的消息發(fā)送和線程源碼的分析。最后并分享一個(gè)相關(guān)demo,讓大家更加熟悉我們天天都在用的框架。
消息訂閱發(fā)送
首先讓我們看看消息訂閱發(fā)送最基本的代碼組成:
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Jack1");
emitter.onNext("Jack2");
emitter.onNext("Jack3");
emitter.onComplete();
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
observable.subscribe(observer);
代碼很簡(jiǎn)單,observable為被觀察者,observer為觀察者,然后通過(guò)observable.subscribe(observer),把觀察者和被觀察者關(guān)聯(lián)起來(lái)。被觀察者發(fā)送消息(emitter.onNext("內(nèi)容")),觀察者就可以在onNext()方法里回調(diào)出來(lái)。
我們先來(lái)看Observable,創(chuàng)建是用Observable.create()方法進(jìn)行創(chuàng)建,源碼如下:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public static <T> T requireNonNull(T object, String message) {
if (object == null) {
throw new NullPointerException(message);
}
return object;
}
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
可以看出,create()方法里最主要的還是創(chuàng)建用ObservableOnSubscribe傳入創(chuàng)建了一個(gè)ObservableCreate對(duì)象并且保存而已。
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
接著是創(chuàng)建Observer,這比較簡(jiǎn)單只是單純創(chuàng)建一個(gè)接口對(duì)象而已
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
訂閱發(fā)送消息
observable.subscribe(observer)的subscribe方法如下:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
//ObjectHelper.requireNonNull()方法
public static <T> T requireNonNull(T object, String message) {
if (object == null) {
throw new NullPointerException(message);
}
return object;
}
//RxJavaPlugins.onSubscribe()方法
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}
從上面源碼可以看出requireNonNull()只是做非空判斷而已,而RxJavaPlugins.onSubscribe()也只是返回最終的觀察者而已。所以關(guān)鍵代碼是抽象方法subscribeActual(observer);那么subscribeActual對(duì)應(yīng)哪個(gè)代碼段呢?
還記得Observable.create()創(chuàng)建的ObservableCreate類嗎,這就是subscribeActual()具體實(shí)現(xiàn)類,源碼如下:
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
從上面的代碼可以看出,首先創(chuàng)建了一個(gè)CreateEmitter對(duì)象并傳入observer,然后回到observer的onSubscribe()方法,而source就是我們之前創(chuàng)建ObservableCreate傳入的ObservableOnSubscribe對(duì)象。
class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
}
而CreateEmitter又繼承ObservableEmitter接口,又回調(diào)ObservableOnSubscribe的subscribe方法,對(duì)應(yīng)著我們的:
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Jack1");
emitter.onNext("Jack2");
emitter.onNext("Jack3");
emitter.onComplete();
}
});
當(dāng)它發(fā)送消息既調(diào)用emitter.onNext()方法時(shí),既調(diào)用了CreateEmitter的onNext()方法:
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
可以看到最終又回調(diào)了觀察者的onNext()方法,把被觀察者的數(shù)據(jù)傳輸給了觀察者。有人會(huì)問(wèn)
isDisposed()是什么意思,是判斷要不要終止傳遞的,我們看emitter.onComplete()源碼:
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}
dispose()方法是終止消息傳遞,也就付了個(gè)DISPOSED常量,而isDisposed()方法就是判斷這個(gè)常量而已。這就是整個(gè)消息訂閱發(fā)送的過(guò)程,用的是觀察者模式。
線程切換
在上面模板代碼的基礎(chǔ)上,線程切換只是改變了如下代碼:
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
下面我們對(duì)線程切換的源碼進(jìn)行一下分析,分為兩部分:subscribeOn()和observeOn()
subscribeOn()
首先是subscribeOn()源碼如下:
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
我們傳進(jìn)去了一個(gè)Scheduler類,Scheduler是一個(gè)調(diào)度類,能夠延時(shí)或周期性地去執(zhí)行一個(gè)任務(wù)。
Scheduler有如下類型:
| 類型 | 使用方式 | 含義 | 使用場(chǎng)景 |
|---|---|---|---|
| IoScheduler | Schedulers.io() | io操作線程 | 讀寫(xiě)SD卡文件,查詢數(shù)據(jù)庫(kù),訪問(wèn)網(wǎng)絡(luò)等IO密集型操作 |
| NewThreadScheduler | Schedulers.newThread() | 創(chuàng)建新線程 | 耗時(shí)操作等 |
| SingleScheduler | Schedulers.single() | 單例線程 | 只需一個(gè)單例線程時(shí) |
| ComputationScheduler | Schedulers.computation() | CPU計(jì)算操作線程 | 圖片壓縮取樣、xml,json解析等CPU密集型計(jì)算 |
| TrampolineScheduler | Schedulers.trampoline() | 當(dāng)前線程 | 需要在當(dāng)前線程立即執(zhí)行任務(wù)時(shí) |
| HandlerScheduler | AndroidSchedulers.mainThread() | Android主線程 | 更新UI等 |
接著就沒(méi)什么了,只是返回一個(gè)ObservableSubscribeOn對(duì)象而已。
observeOn()
首先看源碼如下:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
這里也是沒(méi)什么,只是最終返回一個(gè)ObservableObserveOn對(duì)象而已。
接著還是像原來(lái)那樣調(diào)用subscribe()方法進(jìn)行訂閱,看起來(lái)好像整體變化不大,就是封裝了一些對(duì)象而已,不過(guò)著恰恰是RxJava源碼的精華,當(dāng)他再次調(diào)用subscribeActual()方法時(shí),已經(jīng)不是之前的ObservableCreate()里subscribeActual方法了,而是最先調(diào)用ObservableObserveOn的subscribeActual()方法,對(duì)應(yīng)源碼如下:
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
在這里有兩點(diǎn)要講,一點(diǎn)是ObserveOnObserver是執(zhí)行觀察者的線程,后面還會(huì)詳解,然后就是source.subscribe,這個(gè)source.subscribe調(diào)的是ObservableSubscribeOn的subscribe方法,而subscribe方法因?yàn)槔^承的也是Observable,是Observable里的方法,所以和上面的ObservableCreate一樣的方法,所以會(huì)調(diào)用ObservableSubscribeOn里的subscribeActual()方法,對(duì)應(yīng)的代碼如下:
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
上面代碼中,首先把ObserveOnObserver返回給來(lái)的用SubscribeOnObserver“包裝”起來(lái),然后在回調(diào)Observer的onSubscribe(),就是對(duì)應(yīng)模板代碼的onSubscribe()方法。
接著看SubscribeTask類的源碼:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
其中的source.subscribe(parent),就是我們執(zhí)行子線程的回調(diào)方法,對(duì)應(yīng)我們模板代碼里的被觀察者的subscribe()方法。它放在run()方法里,并且繼承Runnable,說(shuō)明這個(gè)類主要是線程運(yùn)行。接著看scheduler.scheduleDirect()方法對(duì)應(yīng)的源碼如下:
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
在這里,createWorker()也是一個(gè)抽象方法,調(diào)用的是我們的調(diào)度類對(duì)應(yīng)的Schedulers類里面的方法,這里是IoScheduler類,
public final class IoScheduler extends Scheduler{
final AtomicReference<CachedWorkerPool> pool;
//省略....
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
//省略....
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
}
static final class CachedWorkerPool implements Runnable {
//省略....
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
//省略....
}
這就是IoScheduler的createWorker()的方法,其實(shí)最主要的意思就是獲取線程池,以便于生成子線程,讓SubscribeTask()可以運(yùn)行。然后直接調(diào)用 w.schedule(task, delay, unit)方法讓它在線程池里執(zhí)行。上面中那ThreadWorker的源碼如下:
static final class ThreadWorker extends NewThreadWorker {
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
//省略代碼....
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
}
可以看到,這就調(diào)了原始的javaAPI來(lái)進(jìn)行線程池操作。
然后最后一環(huán)在子線程調(diào)用source.subscribe(parent)方法,然后回調(diào)剛開(kāi)始創(chuàng)建的ObservableCreate的subscribeActual(),既:
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
進(jìn)行消息的訂閱綁定。
當(dāng)我們?cè)谡{(diào)用 emitter.onNext(內(nèi)容)時(shí),是在io線程里的,那回調(diào)的onNext()又是什么時(shí)候切換的?那就是前面為了整個(gè)流程流暢性沒(méi)講的在observeOn()里的ObserveOnObserver是執(zhí)行觀察者的線程的過(guò)程。
class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
//省略代碼....
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
//省略代碼....
}
當(dāng)調(diào)用emitter.onNext(內(nèi)容)方法,會(huì)調(diào)用上面的onNext()方法,然后在這個(gè)方法里會(huì)把數(shù)據(jù)壓入一個(gè)隊(duì)列,然后執(zhí)行worker.schedule(this)方法,work是什么呢,還記得AndroidSchedulers.mainThread()嗎,這個(gè)對(duì)應(yīng)這個(gè)HandlerScheduler這個(gè)類,所以createWorker()對(duì)應(yīng)著:
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
public Worker createWorker() {
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, unit.toMillis(delay));
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
}
在next()方法里,運(yùn)用android自帶的Handler消息機(jī)制,通過(guò)把方法包裹在Message里,同通過(guò)handler.sendMessageDelayed()發(fā)送消息,就會(huì)在ui線程里回調(diào)Next()方法,從而實(shí)現(xiàn)從子線程切換到android主線程的操作。我們?cè)谥骶€程拿到數(shù)據(jù)就可以進(jìn)行各種在主線程的操作了。
總結(jié)一下:

ObservableCreate 一> ObservableSubscribeOn 一> ObservableObserveOn為初始化順序
當(dāng)調(diào)用observable.subscribe(observer)時(shí)的執(zhí)行順序
ObservableObserveOn 一> ObservableSubscribeOn 一> ObservableCreate
當(dāng)發(fā)送消息的執(zhí)行順序
ObservableCreate 一> ObservableSubscribeOn 一> ObservableObserveOn
以上就是消息訂閱和線程切換的源碼的所有講解了。
為了讓你們理解更清楚,我仿照RxJava寫(xiě)了大概的消息訂閱和線程切換的最基本代碼和基本功能,以幫助你們理解
https://github.com/jack921/RxJava2Demo
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- Java 基于UDP協(xié)議實(shí)現(xiàn)消息發(fā)送
- 使用Java和WebSocket實(shí)現(xiàn)網(wǎng)頁(yè)聊天室實(shí)例代碼
- 基于Java Socket實(shí)現(xiàn)一個(gè)簡(jiǎn)易在線聊天功能(一)
- java聊天室的實(shí)現(xiàn)代碼
- Java基于socket實(shí)現(xiàn)簡(jiǎn)易聊天室實(shí)例
- java實(shí)現(xiàn)自動(dòng)回復(fù)聊天機(jī)器人
- java socket實(shí)現(xiàn)聊天室 java實(shí)現(xiàn)多人聊天功能
- java中UDP簡(jiǎn)單聊天程序?qū)嵗a
- java Socket實(shí)現(xiàn)網(wǎng)頁(yè)版在線聊天
- java實(shí)現(xiàn)一個(gè)簡(jiǎn)單TCPSocket聊天室功能分享
- Java網(wǎng)絡(luò)編程UDP實(shí)現(xiàn)消息發(fā)送及聊天
相關(guān)文章
Java?C++題解leetcode消失的兩個(gè)數(shù)字實(shí)例
這篇文章主要介紹了Java?C++題解leetcode消失的兩個(gè)數(shù)字實(shí)例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09
Hadoop環(huán)境配置之hive環(huán)境配置詳解
這篇文章主要介紹了Hadoop環(huán)境配置之hive環(huán)境配置,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-12-12
深入淺析springboot中static和templates區(qū)別
這篇文章主要介紹了springboot中static和templates區(qū)別,本文通過(guò)圖文實(shí)例代碼相結(jié)合給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-02-02
RocketMQ消息過(guò)濾與查詢的實(shí)現(xiàn)
這篇文章主要介紹了RocketMQ消息過(guò)濾與查詢的實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07
Java實(shí)現(xiàn)向數(shù)組里添加元素
這篇文章主要介紹了Java實(shí)現(xiàn)向數(shù)組里添加元素方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-11-11
springboot實(shí)現(xiàn)異步任務(wù)
這篇文章主要為大家詳細(xì)介紹了springboot實(shí)現(xiàn)異步任務(wù),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-05-05
spring AOP實(shí)現(xiàn)@Around輸出請(qǐng)求參數(shù)和返回參數(shù)
這篇文章主要介紹了spring AOP實(shí)現(xiàn)@Around輸出請(qǐng)求參數(shù)和返回參數(shù),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02

