JAVA?Reactor中Sinks.Many類三種常見的創(chuàng)建方式及使用
一、Java中的Sinks.Many
在Java編程中,我們經常需要處理數據流。為了更有效地處理數據流,我們可以使用Reactor庫中的Sinks.Many類。這個類提供了一種簡單而強大的方式來處理多個事件流,并且可以通過異步或者同步的方式處理這些事件。
Sinks.Many是Reactor庫中的一個類,它可以用來處理多個事件流。它提供了一種簡單而強大的方式來處理多個事件,可以在任何時間添加、獲取以及完成事件。
Sinks.Many為什么被設計為 Sinks 類的內部接口?
實現(xiàn)細節(jié)隱藏??: Sinks.Many 的實際實現(xiàn)(如 MulticastReplayProcessor 或其他內部類)被封裝在 Sinks 類的內部。用戶只需通過工廠方法(如 Sinks.many().multicast())獲取接口,無需關心底層實現(xiàn)。 ??
接口職責分離??: Sinks 類作為工廠,統(tǒng)一管理所有類型的 Sink(如 Sinks.One、Sinks.Many),而 Sinks.Many 作為內部接口,明確表示它是一個??多播數據源??,與單播(Sinks.One)等場景隔離。
二、Sinks.Many的創(chuàng)建
在源碼中可以看待,Sinks.Many提供了三中常見的創(chuàng)建方式。
(1)unicast()
Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
這種創(chuàng)建方式中提供設置背壓緩沖區(qū)的方法

用途:
創(chuàng)建一個 單播(Unicast) 的
Sinks.Many,僅允許 一個訂閱者 訂閱。核心特性:
單訂閱者限制:第二個訂閱者嘗試訂閱時會觸發(fā)
IllegalStateException。背壓支持:通過緩沖區(qū)處理生產者和消費者的速率不匹配,默認使用無界緩沖區(qū)(需手動配置限制)。
無歷史數據:訂閱者只能收到訂閱后產生的數據。
適用場景:
點對點通信(如任務隊列)。
需要嚴格保證單訂閱者的場景(如資源獨占型操作)。
(2)multicast()
Sinks.Many<String> multicastSink = Sinks.many().multicast().onBackpressureBuffer(100);
這個創(chuàng)建方式,相比較Unicast,就是可以允許多個訂閱者訂閱,同樣提供了多種設置緩存區(qū)的方式

用途:
創(chuàng)建一個 多播(Multicast) 的Sinks.Many,允許多個訂閱者訂閱。核心特性:
多訂閱者支持:所有訂閱者共享同一數據流。
無歷史數據:新訂閱者只能收到訂閱后產生的數據,無法獲取之前的數據。
背壓策略:默認使用無界緩沖區(qū),但可以通過配置限制(如
onBackpressureBuffer(int capacity))。
適用場景:
實時廣播(如股票行情推送)。
需要多個消費者并行處理相同數據的場景。
(3)replay()
Sinks.Many<String> replaySink = Sinks.many().replay().all();
這個方法中提供了限制存放歷史數據的方法

用途:
創(chuàng)建一個 支持數據重放(Replay) 的Sinks.Many,允許多個訂閱者訂閱,并回放歷史數據。核心特性:
歷史數據緩存:新訂閱者可以收到訂閱前一定數量的數據(通過
limit(int)或time(Duration)配置)。多訂閱者支持:與
multicast()類似,但增加了數據重放能力。內存管理:緩存數據量或時間窗口可配置,避免內存無限增長。
適用場景:
需要新訂閱者獲取歷史數據的場景(如聊天記錄回放)。
實時監(jiān)控面板(多個訂閱者需要看到完整上下文)。
三、Sinks.Many的使用
(1)添加事件
一旦我們創(chuàng)建了一個Sinks.Many對象,我們可以使用emitNext()方法來添加一個事件。這個方法接受一個參數,表示要添加的事件。
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
sink.emitNext("Event 1");
sink.emitNext("Event 2");
sink.emitNext("Event 3");我們首先創(chuàng)建了一個Sinks.Many對象,然后使用emitNext()方法添加了三個事件。
當然,如果我們選擇接受Flux流中的數據的時候,可以這樣添加數據
//創(chuàng)建Flux流
Flux<String> flux = Flux.<String>create(sink->{
sink.next("你好");
sink.complete();
});
//創(chuàng)建Sinks.Many處理流數據
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
//訂閱Flux流,并將數據交給sink處理(像做流數據的緩存,篩選流數據)
flux.subscribe(
sink::tryEmitNext,
sink::tryEmitError,
sink::tryEmitComplete
);(2)獲取流數據
我們可以使用Sinks.Many對象來獲取已添加的事件??梢允褂胊sFlux()方法將Sinks.Many對象轉換為一個Flux對象,然后使用Flux對象的方法來訂閱和處理事件。
// 1. .unicast()使用創(chuàng)建單播 Sink
Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
// 2. 將 Sink 轉換為 Flux(供訂閱者訂閱)
Flux<String> flux = unicastSink.asFlux();
// 3. 第一個訂閱者(合法)
flux.subscribe(
data -> System.out.println("訂閱者1收到數據: " + data),
error -> System.err.println("訂閱者1發(fā)生錯誤: " + error),
() -> System.out.println("訂閱者1完成")
);
// 4. 推送數據到 Sink
unicastSink.tryEmitNext("數據1");
unicastSink.tryEmitNext("數據2");
// 5. 嘗試第二個訂閱者(會拋出 IllegalStateException)
try {
flux.subscribe(
data -> System.out.println("訂閱者2收到數據: " + data),
error -> System.err.println("訂閱者2發(fā)生錯誤: " + error),
() -> System.out.println("訂閱者2完成")
);
} catch (IllegalStateException e) {
System.err.println("訂閱者2訂閱失敗: " + e.getMessage());
}
// 6. 關閉 Sink(發(fā)送完成信號)
unicastSink.tryEmitComplete();
這里我使用了unicast()方法創(chuàng)建的Sinks.Many,這個時候我通過asFlux()方法轉換的flux流,只能被一個訂閱者訂閱到,第二個訂閱者,訂閱的時候就出報錯,當然,如果你想要多個訂閱者訂閱,可以使用multicast()或者replay()方式創(chuàng)建,
四、熱冷流
上面三種方式創(chuàng)建的是熱流。
熱流:數據獨立于訂閱者持續(xù)生成,多訂閱者共享實時數據,適用于實時事件推送。
在添加事件代碼中我也有使用到Flux.create()方法創(chuàng)建流,要注意這里創(chuàng)建的是冷流。
冷流:每次調用 subscribe() 時,會觸發(fā) Flux.create() 的回調函數(即 Consumer<SynchronousSink<T>>),??重新生成數據流??。多個訂閱者之間數據獨立。這個跟ThreadLocal有點像,F(xiàn)lux.create()會為每一個訂閱者創(chuàng)建單獨隔離的數據流,保證每一條流中數據互不影響。
冷熱流的選擇;
- 若需要多個訂閱者共享實時數據 → 熱流。
- 若需要每個訂閱者獨立消費完整數據 → 冷流。
- 若需要歷史數據 → 使用
replay()緩存。
總結
到此這篇關于JAVA Reactor中Sinks.Many類三種常見的創(chuàng)建方式及使用的文章就介紹到這了,更多相關JAVA Reactor中Sinks.Many類內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringMVC核心DispatcherServlet處理流程分步詳解
這篇文章主要介紹了SpringMVC核心之中央調度器DispatcherServlet的相關知識,包括SpringMVC請求處理過程及SrpingMVC容器和spring IOC容器關系,需要的朋友可以參考下2023-04-04
mybatis3.4.6 批量更新 foreach 遍歷map 的正確姿勢詳解
這篇文章主要介紹了mybatis3.4.6 批量更新 foreach 遍歷map 的正確姿勢詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-11-11
25行Java代碼將普通圖片轉換為字符畫圖片和文本的實現(xiàn)
這篇文章主要介紹了25行Java代碼將普通圖片轉換為字符畫圖片和文本的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-04-04

