RxJava的消息發(fā)送和線程切換實現(xiàn)原理
RxJava是一個在Java虛擬機上的響應式擴展,通過使用可觀察的序列將異步和基于事件的程序組合起來的一個庫。
它擴展了觀察者模式來支持數據/事件序列,并且添加了操作符,這些操作符允許你聲明性地組合序列,同時抽象出要關注的問題:比如低級線程、同步、線程安全和并發(fā)數據結構等。
RxJava相信大家都非常了解吧,今天分享一下RxJava的消息發(fā)送和線程源碼的分析。最后并分享一個相關demo,讓大家更加熟悉我們天天都在用的框架。
消息訂閱發(fā)送
首先讓我們看看消息訂閱發(fā)送最基本的代碼組成:
Observable observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("Jack1"); emitter.onNext("Jack2"); emitter.onNext("Jack3"); emitter.onComplete(); } }); Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(String s) { Log.d(TAG, "onNext : " + s); } @Override public void onError(Throwable e) { Log.d(TAG, "onError : " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }; observable.subscribe(observer);
代碼很簡單,observable為被觀察者,observer為觀察者,然后通過observable.subscribe(observer),把觀察者和被觀察者關聯(lián)起來。被觀察者發(fā)送消息(emitter.onNext("內容")),觀察者就可以在onNext()方法里回調出來。
我們先來看Observable,創(chuàng)建是用Observable.create()方法進行創(chuàng)建,源碼如下:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } public static <T> T requireNonNull(T object, String message) { if (object == null) { throw new NullPointerException(message); } return object; } public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; }
可以看出,create()方法里最主要的還是創(chuàng)建用ObservableOnSubscribe傳入創(chuàng)建了一個ObservableCreate對象并且保存而已。
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } }
接著是創(chuàng)建Observer,這比較簡單只是單純創(chuàng)建一個接口對象而已
public interface Observer<T> { void onSubscribe(@NonNull Disposable d); void onNext(@NonNull T t); void onError(@NonNull Throwable e); void onComplete(); }
訂閱發(fā)送消息
observable.subscribe(observer)的subscribe方法如下:
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } } //ObjectHelper.requireNonNull()方法 public static <T> T requireNonNull(T object, String message) { if (object == null) { throw new NullPointerException(message); } return object; } //RxJavaPlugins.onSubscribe()方法 public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) { BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe; if (f != null) { return apply(f, source, observer); } return observer; }
從上面源碼可以看出requireNonNull()只是做非空判斷而已,而RxJavaPlugins.onSubscribe()也只是返回最終的觀察者而已。所以關鍵代碼是抽象方法subscribeActual(observer);那么subscribeActual對應哪個代碼段呢?
還記得Observable.create()創(chuàng)建的ObservableCreate類嗎,這就是subscribeActual()具體實現(xiàn)類,源碼如下:
protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
從上面的代碼可以看出,首先創(chuàng)建了一個CreateEmitter對象并傳入observer,然后回到observer的onSubscribe()方法,而source就是我們之前創(chuàng)建ObservableCreate傳入的ObservableOnSubscribe對象。
class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { }
而CreateEmitter又繼承ObservableEmitter接口,又回調ObservableOnSubscribe的subscribe方法,對應著我們的:
Observable observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("Jack1"); emitter.onNext("Jack2"); emitter.onNext("Jack3"); emitter.onComplete(); } });
當它發(fā)送消息既調用emitter.onNext()方法時,既調用了CreateEmitter的onNext()方法:
public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } }
可以看到最終又回調了觀察者的onNext()方法,把被觀察者的數據傳輸給了觀察者。有人會問
isDisposed()是什么意思,是判斷要不要終止傳遞的,我們看emitter.onComplete()源碼:
public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } public static boolean dispose(AtomicReference<Disposable> field) { Disposable current = field.get(); Disposable d = DISPOSED; if (current != d) { current = field.getAndSet(d); if (current != d) { if (current != null) { current.dispose(); } return true; } } return false; } public static boolean isDisposed(Disposable d) { return d == DISPOSED; }
dispose()方法是終止消息傳遞,也就付了個DISPOSED常量,而isDisposed()方法就是判斷這個常量而已。這就是整個消息訂閱發(fā)送的過程,用的是觀察者模式。
線程切換
在上面模板代碼的基礎上,線程切換只是改變了如下代碼:
observable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer);
下面我們對線程切換的源碼進行一下分析,分為兩部分:subscribeOn()和observeOn()
subscribeOn()
首先是subscribeOn()源碼如下:
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
我們傳進去了一個Scheduler類,Scheduler是一個調度類,能夠延時或周期性地去執(zhí)行一個任務。
Scheduler有如下類型:
類型 | 使用方式 | 含義 | 使用場景 |
---|---|---|---|
IoScheduler | Schedulers.io() | io操作線程 | 讀寫SD卡文件,查詢數據庫,訪問網絡等IO密集型操作 |
NewThreadScheduler | Schedulers.newThread() | 創(chuàng)建新線程 | 耗時操作等 |
SingleScheduler | Schedulers.single() | 單例線程 | 只需一個單例線程時 |
ComputationScheduler | Schedulers.computation() | CPU計算操作線程 | 圖片壓縮取樣、xml,json解析等CPU密集型計算 |
TrampolineScheduler | Schedulers.trampoline() | 當前線程 | 需要在當前線程立即執(zhí)行任務時 |
HandlerScheduler | AndroidSchedulers.mainThread() | Android主線程 | 更新UI等 |
接著就沒什么了,只是返回一個ObservableSubscribeOn對象而已。
observeOn()
首先看源碼如下:
public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
這里也是沒什么,只是最終返回一個ObservableObserveOn對象而已。
接著還是像原來那樣調用subscribe()方法進行訂閱,看起來好像整體變化不大,就是封裝了一些對象而已,不過著恰恰是RxJava源碼的精華,當他再次調用subscribeActual()方法時,已經不是之前的ObservableCreate()里subscribeActual方法了,而是最先調用ObservableObserveOn的subscribeActual()方法,對應源碼如下:
protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
在這里有兩點要講,一點是ObserveOnObserver是執(zhí)行觀察者的線程,后面還會詳解,然后就是source.subscribe,這個source.subscribe調的是ObservableSubscribeOn的subscribe方法,而subscribe方法因為繼承的也是Observable,是Observable里的方法,所以和上面的ObservableCreate一樣的方法,所以會調用ObservableSubscribeOn里的subscribeActual()方法,對應的代碼如下:
public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
上面代碼中,首先把ObserveOnObserver返回給來的用SubscribeOnObserver“包裝”起來,然后在回調Observer的onSubscribe(),就是對應模板代碼的onSubscribe()方法。
接著看SubscribeTask類的源碼:
final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } }
其中的source.subscribe(parent),就是我們執(zhí)行子線程的回調方法,對應我們模板代碼里的被觀察者的subscribe()方法。它放在run()方法里,并且繼承Runnable,說明這個類主要是線程運行。接著看scheduler.scheduleDirect()方法對應的源碼如下:
public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }
在這里,createWorker()也是一個抽象方法,調用的是我們的調度類對應的Schedulers類里面的方法,這里是IoScheduler類,
public final class IoScheduler extends Scheduler{ final AtomicReference<CachedWorkerPool> pool; //省略.... public Worker createWorker() { return new EventLoopWorker(pool.get()); } static final class EventLoopWorker extends Scheduler.Worker { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once = new AtomicBoolean(); EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); this.threadWorker = pool.get(); } //省略.... @NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } } static final class CachedWorkerPool implements Runnable { //省略.... ThreadWorker get() { if (allWorkers.isDisposed()) { return SHUTDOWN_THREAD_WORKER; } while (!expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll(); if (threadWorker != null) { return threadWorker; } } ThreadWorker w = new ThreadWorker(threadFactory); allWorkers.add(w); return w; } //省略.... }
這就是IoScheduler的createWorker()的方法,其實最主要的意思就是獲取線程池,以便于生成子線程,讓SubscribeTask()可以運行。然后直接調用 w.schedule(task, delay, unit)方法讓它在線程池里執(zhí)行。上面中那ThreadWorker的源碼如下:
static final class ThreadWorker extends NewThreadWorker { private long expirationTime; ThreadWorker(ThreadFactory threadFactory) { super(threadFactory); this.expirationTime = 0L; } //省略代碼.... } public class NewThreadWorker extends Scheduler.Worker implements Disposable { private final ScheduledExecutorService executor; public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); } public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; } }
可以看到,這就調了原始的javaAPI來進行線程池操作。
然后最后一環(huán)在子線程調用source.subscribe(parent)方法,然后回調剛開始創(chuàng)建的ObservableCreate的subscribeActual(),既:
protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
進行消息的訂閱綁定。
當我們在調用 emitter.onNext(內容)時,是在io線程里的,那回調的onNext()又是什么時候切換的?那就是前面為了整個流程流暢性沒講的在observeOn()里的ObserveOnObserver是執(zhí)行觀察者的線程的過程。
class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { //省略代碼.... ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.actual = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } @Override public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; if (s instanceof QueueDisposable) { @SuppressWarnings("unchecked") QueueDisposable<T> qd = (QueueDisposable<T>) s; int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); if (m == QueueDisposable.SYNC) { sourceMode = m; queue = qd; done = true; actual.onSubscribe(this); schedule(); return; } if (m == QueueDisposable.ASYNC) { sourceMode = m; queue = qd; actual.onSubscribe(this); return; } } queue = new SpscLinkedArrayQueue<T>(bufferSize); actual.onSubscribe(this); } } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } } //省略代碼.... }
當調用emitter.onNext(內容)方法,會調用上面的onNext()方法,然后在這個方法里會把數據壓入一個隊列,然后執(zhí)行worker.schedule(this)方法,work是什么呢,還記得AndroidSchedulers.mainThread()嗎,這個對應這個HandlerScheduler這個類,所以createWorker()對應著:
private static final class MainHolder { static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper())); } public Worker createWorker() { return new HandlerWorker(handler); } private static final class HandlerWorker extends Worker { private final Handler handler; private volatile boolean disposed; HandlerWorker(Handler handler) { this.handler = handler; } @Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); if (disposed) { return Disposables.disposed(); } run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this worker's runnables. handler.sendMessageDelayed(message, unit.toMillis(delay)); if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; } }
在next()方法里,運用android自帶的Handler消息機制,通過把方法包裹在Message里,同通過handler.sendMessageDelayed()發(fā)送消息,就會在ui線程里回調Next()方法,從而實現(xiàn)從子線程切換到android主線程的操作。我們在主線程拿到數據就可以進行各種在主線程的操作了。
總結一下:
ObservableCreate 一> ObservableSubscribeOn 一> ObservableObserveOn為初始化順序
當調用observable.subscribe(observer)時的執(zhí)行順序
ObservableObserveOn 一> ObservableSubscribeOn 一> ObservableCreate
當發(fā)送消息的執(zhí)行順序
ObservableCreate 一> ObservableSubscribeOn 一> ObservableObserveOn
以上就是消息訂閱和線程切換的源碼的所有講解了。
為了讓你們理解更清楚,我仿照RxJava寫了大概的消息訂閱和線程切換的最基本代碼和基本功能,以幫助你們理解
https://github.com/jack921/RxJava2Demo
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
- Java 基于UDP協(xié)議實現(xiàn)消息發(fā)送
- 使用Java和WebSocket實現(xiàn)網頁聊天室實例代碼
- 基于Java Socket實現(xiàn)一個簡易在線聊天功能(一)
- java聊天室的實現(xiàn)代碼
- Java基于socket實現(xiàn)簡易聊天室實例
- java實現(xiàn)自動回復聊天機器人
- java socket實現(xiàn)聊天室 java實現(xiàn)多人聊天功能
- java中UDP簡單聊天程序實例代碼
- java Socket實現(xiàn)網頁版在線聊天
- java實現(xiàn)一個簡單TCPSocket聊天室功能分享
- Java網絡編程UDP實現(xiàn)消息發(fā)送及聊天
相關文章
Hadoop環(huán)境配置之hive環(huán)境配置詳解
這篇文章主要介紹了Hadoop環(huán)境配置之hive環(huán)境配置,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-12-12深入淺析springboot中static和templates區(qū)別
這篇文章主要介紹了springboot中static和templates區(qū)別,本文通過圖文實例代碼相結合給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2020-02-02spring AOP實現(xiàn)@Around輸出請求參數和返回參數
這篇文章主要介紹了spring AOP實現(xiàn)@Around輸出請求參數和返回參數,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02