關(guān)于RxJS Subject的學(xué)習(xí)筆記
Observer Pattern
觀察者模式定義
觀察者模式又叫發(fā)布訂閱模式(Publish/Subscribe),它定義了一種一對(duì)多的關(guān)系,讓多個(gè)觀察者對(duì)象同時(shí)監(jiān)聽某一個(gè)主題對(duì)象,這個(gè)主題對(duì)象的狀態(tài)發(fā)生變化時(shí)就會(huì)通知所有的觀察者對(duì)象,使得它們能夠自動(dòng)更新自己。
我們可以使用日常生活中,期刊訂閱的例子來(lái)形象地解釋一下上面的概念。期刊訂閱包含兩個(gè)主要的角色:期刊出版方和訂閱者,他們之間的關(guān)系如下:
- 期刊出版方 - 負(fù)責(zé)期刊的出版和發(fā)行工作
- 訂閱者 - 只需執(zhí)行訂閱操作,新版的期刊發(fā)布后,就會(huì)主動(dòng)收到通知,如果取消訂閱,以后就不會(huì)再收到通知
在觀察者模式中也有兩個(gè)主要角色:Subject (主題) 和 Observer (觀察者) 。它們分別對(duì)應(yīng)例子中的期刊出版方和訂閱者。接下來(lái)我們來(lái)看張圖,從而加深對(duì)上面概念的理解。
觀察者模式結(jié)構(gòu)
觀察者模式實(shí)戰(zhàn)
Subject 類定義
class Subject { constructor() { this.observerCollection = []; } addObserver(observer) { // 添加觀察者 this.observerCollection.push(observer); } deleteObserver(observer) { // 移除觀察者 let index = this.observerCollection.indexOf(observer); if(index >= 0) this.observerCollection.splice(index, 1); } notifyObservers() { // 通知觀察者 this.observerCollection.forEach((observer)=>observer.notify()); } }
Observer 類定義
class Observer { constructor(name) { this.name = name; } notify() { console.log(`${this.name} has been notified.`); } }
使用示例
let subject = new Subject(); // 創(chuàng)建主題對(duì)象 let observer1 = new Observer('semlinker'); // 創(chuàng)建觀察者A - 'semlinker' let observer2 = new Observer('lolo'); // 創(chuàng)建觀察者B - 'lolo' subject.addObserver(observer1); // 注冊(cè)觀察者A subject.addObserver(observer2); // 注冊(cè)觀察者B subject.notifyObservers(); // 通知觀察者 subject.deleteObserver(observer1); // 移除觀察者A subject.notifyObservers(); // 驗(yàn)證是否成功移除
以上代碼成功運(yùn)行后控制臺(tái)的輸出結(jié)果:
semlinker has been notified.
lolo has been notified.
lolo has been notified.
Observable subscribe
在介紹RxJS - Subject 之前,我們先來(lái)看個(gè)示例:
const interval$ = Rx.Observable.interval(1000).take(3); interval$.subscribe({ next: value => console.log('Observer A get value: ' + value); }); setTimeout(() => { interval$.subscribe({ next: value => console.log('Observer B get value: ' + value); }); }, 1000);
以上代碼運(yùn)行后,控制臺(tái)的輸出結(jié)果:
Observer A get value: 0
Observer A get value: 1
Observer B get value: 0
Observer A get value: 2
Observer B get value: 1
Observer B get value: 2
通過(guò)以上示例,我們可以得出以下結(jié)論:
- Observable 對(duì)象可以被重復(fù)訂閱
- Observable 對(duì)象每次被訂閱后,都會(huì)重新執(zhí)行
上面的示例,我們可以簡(jiǎn)單地認(rèn)為兩次調(diào)用普通的函數(shù),具體參考以下代碼:
function interval() { setInterval(() => console.log('..'), 1000); } interval(); setTimeout(() => { interval(); }, 1000);
Observable 對(duì)象的默認(rèn)行為,適用于大部分場(chǎng)景。但有些時(shí)候,我們會(huì)希望在第二次訂閱的時(shí)候,不會(huì)從頭開始接收 Observable 發(fā)出的值,而是從第一次訂閱當(dāng)前正在處理的值開始發(fā)送,我們把這種處理方式成為組播 (multicast),那我們要怎么實(shí)現(xiàn)呢 ?回想一下我們剛才介紹過(guò)觀察者模式,你腦海中是不是已經(jīng)想到方案了。沒錯(cuò),我們可以通過(guò)自定義 Subject 來(lái)實(shí)現(xiàn)上述功能。
自定義 Subject
Subject 類定義
class Subject { constructor() { this.observers = []; } addObserver(observer) { this.observers.push(observer); } next(value) { this.observers.forEach(o => o.next(value)); } error(error){ this.observers.forEach(o => o.error(error)); } complete() { this.observers.forEach(o => o.complete()); } }
使用示例
const interval$ = Rx.Observable.interval(1000).take(3); let subject = new Subject(); let observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.addObserver(observerA); // 添加觀察者A interval$.subscribe(subject); // 訂閱interval$對(duì)象 setTimeout(() => { subject.addObserver(observerB); // 添加觀察者B }, 1000);
以上代碼運(yùn)行后,控制臺(tái)的輸出結(jié)果:
Observer A get value: 0
Observer A get value: 1
Observer B get value: 1
Observer A get value: 2
Observer B get value: 2
Observer A complete!
Observer B complete!
通過(guò)自定義 Subject,我們實(shí)現(xiàn)了前面提到的功能。接下來(lái)我們進(jìn)入正題 - RxJS Subject。
RxJS Subject
首先我們通過(guò) RxJS Subject 來(lái)重寫一下上面的示例:
const interval$ = Rx.Observable.interval(1000).take(3); let subject = new Rx.Subject(); let observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.subscribe(observerA); // 添加觀察者A interval$.subscribe(subject); // 訂閱interval$對(duì)象 setTimeout(() => { subject.subscribe(observerB); // 添加觀察者B }, 1000);
RxJS Subject 源碼片段
/** * Suject繼承于Observable */ export class Subject extends Observable { constructor() { super(); this.observers = []; // 觀察者列表 this.closed = false; this.isStopped = false; this.hasError = false; this.thrownError = null; } next(value) { if (this.closed) { throw new ObjectUnsubscribedError(); } if (!this.isStopped) { const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循環(huán)調(diào)用觀察者next方法,通知觀察者 copy[i].next(value); } } } error(err) { if (this.closed) { throw new ObjectUnsubscribedError(); } this.hasError = true; this.thrownError = err; this.isStopped = true; const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循環(huán)調(diào)用觀察者error方法 copy[i].error(err); } this.observers.length = 0; } complete() { if (this.closed) { throw new ObjectUnsubscribedError(); } this.isStopped = true; const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循環(huán)調(diào)用觀察者complete方法 copy[i].complete(); } this.observers.length = 0; // 清空內(nèi)部觀察者列表 } }
通過(guò) RxJS Subject 示例和源碼片段,對(duì)于 Subject 我們可以得出以下結(jié)論:
- Subject 既是 Observable 對(duì)象,又是 Observer 對(duì)象
- 當(dāng)有新消息時(shí),Subject 會(huì)對(duì)內(nèi)部的 observers 列表進(jìn)行組播 (multicast)
Angular 2 RxJS Subject 應(yīng)用
在 Angular 2 中,我們可以利用 RxJS Subject 來(lái)實(shí)現(xiàn)組件通信,具體示例如下:
message.service.ts
import { Injectable } from '@angular/core'; import {Observable} from 'rxjs/Observable'; import { Subject } from 'rxjs/Subject'; @Injectable() export class MessageService { private subject = new Subject<any>(); sendMessage(message: string) { this.subject.next({ text: message }); } clearMessage() { this.subject.next(); } getMessage(): Observable<any> { return this.subject.asObservable(); } }
home.component.ts
import { Component } from '@angular/core'; import { MessageService } from '../_services/index'; @Component({ moduleId: module.id, templateUrl: 'home.component.html' }) export class HomeComponent { constructor(private messageService: MessageService) {} sendMessage(): void { // 發(fā)送消息 this.messageService.sendMessage('Message from Home Component to App Component!'); } clearMessage(): void { // 清除消息 this.messageService.clearMessage(); } }
app.component.ts
import { Component, OnDestroy } from '@angular/core'; import { Subscription } from 'rxjs/Subscription'; import { MessageService } from './_services/index'; @Component({ moduleId: module.id, selector: 'app', templateUrl: 'app.component.html' }) export class AppComponent implements OnDestroy { message: any; subscription: Subscription; constructor(private messageService: MessageService) { this.subscription = this.messageService.getMessage() .subscribe(message => { this.message = message; }); } ngOnDestroy() { this.subscription.unsubscribe(); } }
以上示例實(shí)現(xiàn)的功能是組件之間消息通信,即 HomeComponent 子組件,向 AppComponent 父組件發(fā)送消息。代碼運(yùn)行后,瀏覽器的顯示結(jié)果如下:
Subject 存在的問題
因?yàn)?Subject 在訂閱時(shí),是把 observer 存放到觀察者列表中,并在接收到新值的時(shí)候,遍歷觀察者列表并調(diào)用觀察者上的 next
方法,具體如下:
next(value) { if (this.closed) { throw new ObjectUnsubscribedError(); } if (!this.isStopped) { const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循環(huán)調(diào)用觀察者next方法,通知觀察者 copy[i].next(value); } } }
這樣會(huì)有一個(gè)大問題,如果某個(gè) observer 在執(zhí)行時(shí)出現(xiàn)異常,卻沒進(jìn)行異常處理,就會(huì)影響到其它的訂閱者,具體示例如下:
const source = Rx.Observable.interval(1000); const subject = new Rx.Subject(); const example = subject.map(x => { if (x === 1) { throw new Error('oops'); } return x; }); subject.subscribe(x => console.log('A', x)); example.subscribe(x => console.log('B', x)); subject.subscribe(x => console.log('C', x)); source.subscribe(subject);
以上代碼運(yùn)行后,控制臺(tái)的輸出結(jié)果:
A 0
B 0
C 0
A 1
Rx.min.js:74 Uncaught Error: oops
在代碼運(yùn)行前,大家會(huì)認(rèn)為觀察者B 會(huì)在接收到 1
值時(shí)拋出異常,觀察者 A 和 C 仍會(huì)正常運(yùn)行。但實(shí)際上,在當(dāng)前的 RxJS 版本中若觀察者 B 報(bào)錯(cuò),觀察者 A 和 C 也會(huì)停止運(yùn)行。那么應(yīng)該如何解決這個(gè)問題呢?目前最簡(jiǎn)單的方式就是為所有的觀察者添加異常處理,更新后的代碼如下:
const source = Rx.Observable.interval(1000); const subject = new Rx.Subject(); const example = subject.map(x => { if (x === 1) { throw new Error('oops'); } return x; }); subject.subscribe( x => console.log('A', x), error => console.log('A Error:' + error) ); example.subscribe( x => console.log('B', x), error => console.log('B Error:' + error) ); subject.subscribe( x => console.log('C', x), error => console.log('C Error:' + error) ); source.subscribe(subject);
JSBin - RxJS Subject Problem Solved Demo
RxJS Subject & Observable
Subject 其實(shí)是觀察者模式的實(shí)現(xiàn),所以當(dāng)觀察者訂閱 Subject 對(duì)象時(shí),Subject 對(duì)象會(huì)把訂閱者添加到觀察者列表中,每當(dāng)有 subject 對(duì)象接收到新值時(shí),它就會(huì)遍歷觀察者列表,依次調(diào)用觀察者內(nèi)部的 next()
方法,把值一一送出。
Subject 之所以具有 Observable 中的所有方法,是因?yàn)?Subject 類繼承了 Observable 類,在 Subject 類中有五個(gè)重要的方法:
- next - 每當(dāng) Subject 對(duì)象接收到新值的時(shí)候,next 方法會(huì)被調(diào)用
- error - 運(yùn)行中出現(xiàn)異常,error 方法會(huì)被調(diào)用
- complete - Subject 訂閱的 Observable 對(duì)象結(jié)束后,complete 方法會(huì)被調(diào)用
- subscribe - 添加觀察者
- unsubscribe - 取消訂閱 (設(shè)置終止標(biāo)識(shí)符、清空觀察者列表)
BehaviorSubject
BehaviorSubject 定義
export class BehaviorSubject extends Subject { constructor(_value) { // 設(shè)置初始值 super(); this._value = _value; } get value() { // 獲取當(dāng)前值 return this.getValue(); } _subscribe(subscriber) { const subscription = super._subscribe(subscriber); if (subscription && !subscription.closed) { subscriber.next(this._value); // 為新的訂閱者發(fā)送當(dāng)前最新的值 } return subscription; } getValue() { if (this.hasError) { throw this.thrownError; } else if (this.closed) { throw new ObjectUnsubscribedError(); } else { return this._value; } } next(value) { // 調(diào)用父類Subject的next方法,同時(shí)更新當(dāng)前值 super.next(this._value = value); } }
BehaviorSubject 應(yīng)用
有些時(shí)候我們會(huì)希望 Subject 能保存當(dāng)前的最新狀態(tài),而不是單純的進(jìn)行事件發(fā)送,也就是說(shuō)每當(dāng)新增一個(gè)觀察者的時(shí)候,我們希望 Subject 能夠立即發(fā)出當(dāng)前最新的值,而不是沒有任何響應(yīng)。具體我們先看一下示例:
var subject = new Rx.Subject(); var observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); setTimeout(() => { subject.subscribe(observerB); // 1秒后訂閱 }, 1000);
以上代碼運(yùn)行后,控制臺(tái)的輸出結(jié)果:
Observer A get value: 1
Observer A get value: 2
Observer A get value: 3
通過(guò)輸出結(jié)果,我們發(fā)現(xiàn)在 observerB 訂閱 Subject 對(duì)象后,它再也沒有收到任何值了。因?yàn)?Subject 對(duì)象沒有再調(diào)用 next()
方法。但很多時(shí)候我們會(huì)希望 Subject 對(duì)象能夠保存當(dāng)前的狀態(tài),當(dāng)新增訂閱者的時(shí)候,自動(dòng)把當(dāng)前最新的值發(fā)送給訂閱者。要實(shí)現(xiàn)這個(gè)功能,我們就需要使用 BehaviorSubject。
BehaviorSubject 跟 Subject 最大的不同就是 BehaviorSubject 是用來(lái)保存當(dāng)前最新的值,而不是單純的發(fā)送事件。BehaviorSubject 會(huì)記住最近一次發(fā)送的值,并把該值作為當(dāng)前值保存在內(nèi)部的屬性中。接下來(lái)我們來(lái)使用 BehaviorSubject 重新一下上面的示例:
var subject = new Rx.BehaviorSubject(0); // 設(shè)定初始值 var observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); setTimeout(() => { subject.subscribe(observerB); // 1秒后訂閱 }, 1000);
以上代碼運(yùn)行后,控制臺(tái)的輸出結(jié)果:
Observer A get value: 0
Observer A get value: 1
Observer A get value: 2
Observer A get value: 3
Observer B get value: 3
ReplaySubject
ReplaySubject 定義
export class ReplaySubject extends Subject { constructor(bufferSize = Number.POSITIVE_INFINITY, windowTime = Number.POSITIVE_INFINITY, scheduler) { super(); this.scheduler = scheduler; this._events = []; // ReplayEvent對(duì)象列表 this._bufferSize = bufferSize < 1 ? 1 : bufferSize; // 設(shè)置緩沖區(qū)大小 this._windowTime = windowTime < 1 ? 1 : windowTime; } next(value) { const now = this._getNow(); this._events.push(new ReplayEvent(now, value)); this._trimBufferThenGetEvents(); super.next(value); } _subscribe(subscriber) { const _events = this._trimBufferThenGetEvents(); // 過(guò)濾ReplayEvent對(duì)象列表 let subscription; if (this.closed) { throw new ObjectUnsubscribedError(); } ... else { this.observers.push(subscriber); subscription = new SubjectSubscription(this, subscriber); } ... const len = _events.length; // 重新發(fā)送設(shè)定的最后bufferSize個(gè)值 for (let i = 0; i < len && !subscriber.closed; i++) { subscriber.next(_events[i].value); } ... return subscription; } } class ReplayEvent { constructor(time, value) { this.time = time; this.value = value; } }
ReplaySubject 應(yīng)用
有些時(shí)候我們希望在 Subject 新增訂閱者后,能向新增的訂閱者重新發(fā)送最后幾個(gè)值,這時(shí)我們就可以使用 ReplaySubject ,具體示例如下:
var subject = new Rx.ReplaySubject(2); // 重新發(fā)送最后2個(gè)值 var observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); setTimeout(() => { subject.subscribe(observerB); // 1秒后訂閱 }, 1000);
以上代碼運(yùn)行后,控制臺(tái)的輸出結(jié)果:
Observer A get value: 1
Observer A get value: 2
Observer A get value: 3
Observer B get value: 2
Observer B get value: 3
可能會(huì)有人認(rèn)為 ReplaySubject(1)
是不是等同于 BehaviorSubject,其實(shí)它們是不一樣的。在創(chuàng)建BehaviorSubject 對(duì)象時(shí),是設(shè)置初始值,它用于表示 Subject 對(duì)象當(dāng)前的狀態(tài),而 ReplaySubject 只是事件的重放。
AsyncSubject
AsyncSubject 定義
export class AsyncSubject extends Subject { constructor() { super(...arguments); this.value = null; this.hasNext = false; this.hasCompleted = false; // 標(biāo)識(shí)是否已完成 } _subscribe(subscriber) { if (this.hasError) { subscriber.error(this.thrownError); return Subscription.EMPTY; } else if (this.hasCompleted && this.hasNext) { // 等到完成后,才發(fā)出最后的值 subscriber.next(this.value); subscriber.complete(); return Subscription.EMPTY; } return super._subscribe(subscriber); } next(value) { if (!this.hasCompleted) { // 若未完成,保存當(dāng)前的值 this.value = value; this.hasNext = true; } } }
AsyncSubject 應(yīng)用
AsyncSubject 類似于 last
操作符,它會(huì)在 Subject 結(jié)束后發(fā)出最后一個(gè)值,具體示例如下:
var subject = new Rx.AsyncSubject(); var observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); subject.complete(); setTimeout(() => { subject.subscribe(observerB); // 1秒后訂閱 }, 1000);
以上代碼運(yùn)行后,控制臺(tái)的輸出結(jié)果:
Observer A get value: 3
Observer A complete!
Observer B get value: 3
Observer B complete!
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
JS 進(jìn)度條效果實(shí)現(xiàn)代碼整理
進(jìn)度條效果實(shí)現(xiàn)代碼,有助于緩解頁(yè)面顯示慢的頁(yè)面,給用戶一個(gè)等待時(shí)間的效果2011-05-05獲取JAVASCRIPT時(shí)間戳函數(shù)的5種方法
JavaScript獲得時(shí)間戳的方法有五種,后四種都是通過(guò)實(shí)例化時(shí)間對(duì)象new?Date()?來(lái)進(jìn)一步獲取當(dāng)前的時(shí)間戳,JavaScript處理時(shí)間主要使用時(shí)間對(duì)象Date,本文對(duì)js時(shí)間戳函數(shù)獲取方法給大家介紹的非常詳細(xì),需要的朋友參考下吧2024-01-01解決css和js的{}與smarty定界符沖突問題的兩種方法
當(dāng)輸入url地址后網(wǎng)頁(yè)出現(xiàn)如下文所描述的問題通常是css和js的{}與smarty定界符沖突導(dǎo)致的,解決方法有兩個(gè),具體如下,感興趣的朋友可以參考下2013-09-09Bootstrap彈出框(modal)垂直居中的問題及解決方案詳解
這篇文章主要介紹了Bootstrap彈出框(modal)垂直居中的問題及解決方案詳解的相關(guān)資料,非常不錯(cuò)具有參考借鑒價(jià)值,需要的朋友可以參考下2016-06-06JS(JQuery)操作Array的相關(guān)方法介紹
本篇文章主要是對(duì)JS(JQuery)操作Array的相關(guān)方法進(jìn)行了詳細(xì)的介紹,需要的朋友可以過(guò)來(lái)參考下,希望對(duì)大家有所幫助2014-02-02ES5 ES6中Array對(duì)象去除重復(fù)項(xiàng)的方法總結(jié)
這篇文章主要給大家介紹了Array對(duì)象去除重復(fù)項(xiàng)的相關(guān)資料,文中通過(guò)示例代碼詳細(xì)介紹了在ES5和ES6中Array對(duì)象去除重復(fù)項(xiàng)的方法,需要的朋友可以參考借鑒,下面來(lái)一起看看吧。2017-04-04js模仿html5 placeholder適應(yīng)于不支持的瀏覽器
html5原生支持placeholder,對(duì)于不支持的瀏覽器(ie)可用js模擬實(shí)現(xiàn),不要走開,接下來(lái)為您詳細(xì)介紹實(shí)現(xiàn)方法2013-01-01JavaScript和JQuery的鼠標(biāo)mouse事件冒泡處理
這篇文章主要介紹了JavaScript和JQuery的鼠標(biāo)mouse事件冒泡處理,本文總結(jié)出了mouse事件的一些定論,并分別給出了JavaScript和JQuery測(cè)試代碼,需要的朋友可以參考下2015-06-06