欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

JAVA?Reactor中Sinks.Many類三種常見的創(chuàng)建方式及使用

 更新時間:2025年07月29日 10:32:41   作者:嗚嗚。  
Sinks.Many用于創(chuàng)建多值(Multi-Value)的發(fā)布者(Publisher)的一種機制,它允許用戶將數據從一個地方發(fā)送到多個訂閱者,這篇文章主要介紹了JAVA?Reactor中Sinks.Many類三種常見的創(chuàng)建方式及使用,需要的朋友可以參考下

一、Java中的Sinks.Many

在Java編程中,我們經常需要處理數據流。為了更有效地處理數據流,我們可以使用Reactor庫中的Sinks.Many類。這個類提供了一種簡單而強大的方式來處理多個事件流,并且可以通過異步或者同步的方式處理這些事件。

Sinks.Many是Reactor庫中的一個類,它可以用來處理多個事件流。它提供了一種簡單而強大的方式來處理多個事件,可以在任何時間添加、獲取以及完成事件。

Sinks.Many為什么被設計為 Sinks 類的內部接口?

實現(xiàn)細節(jié)隱藏??: Sinks.Many 的實際實現(xiàn)(如 MulticastReplayProcessor 或其他內部類)被封裝在 Sinks 類的內部。用戶只需通過工廠方法(如 Sinks.many().multicast())獲取接口,無需關心底層實現(xiàn)。 ??

接口職責分離??: Sinks 類作為工廠,統(tǒng)一管理所有類型的 Sink(如 Sinks.One、Sinks.Many),而 Sinks.Many 作為內部接口,明確表示它是一個??多播數據源??,與單播(Sinks.One)等場景隔離。

二、Sinks.Many的創(chuàng)建

在源碼中可以看待,Sinks.Many提供了三中常見的創(chuàng)建方式。

(1)unicast()

Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();

這種創(chuàng)建方式中提供設置背壓緩沖區(qū)的方法

  • 用途

  • 創(chuàng)建一個 單播(Unicast) 的 Sinks.Many,僅允許 一個訂閱者 訂閱。

  • 核心特性

    • 單訂閱者限制:第二個訂閱者嘗試訂閱時會觸發(fā) IllegalStateException。

    • 背壓支持:通過緩沖區(qū)處理生產者和消費者的速率不匹配,默認使用無界緩沖區(qū)(需手動配置限制)。

    • 無歷史數據:訂閱者只能收到訂閱后產生的數據。

  • 適用場景

    • 點對點通信(如任務隊列)。

    • 需要嚴格保證單訂閱者的場景(如資源獨占型操作)。

(2)multicast()

Sinks.Many<String> multicastSink = Sinks.many().multicast().onBackpressureBuffer(100);

這個創(chuàng)建方式,相比較Unicast,就是可以允許多個訂閱者訂閱,同樣提供了多種設置緩存區(qū)的方式

  • 用途
    創(chuàng)建一個 多播(Multicast) 的 Sinks.Many,允許多個訂閱者訂閱。

  • 核心特性

    • 多訂閱者支持:所有訂閱者共享同一數據流。

    • 無歷史數據:新訂閱者只能收到訂閱后產生的數據,無法獲取之前的數據。

    • 背壓策略:默認使用無界緩沖區(qū),但可以通過配置限制(如 onBackpressureBuffer(int capacity))。

  • 適用場景

    • 實時廣播(如股票行情推送)。

    • 需要多個消費者并行處理相同數據的場景。

(3)replay()

Sinks.Many<String> replaySink = Sinks.many().replay().all();
這個方法中提供了限制存放歷史數據的方法

  • 用途
    創(chuàng)建一個 支持數據重放(Replay) 的 Sinks.Many,允許多個訂閱者訂閱,并回放歷史數據。

  • 核心特性

    • 歷史數據緩存:新訂閱者可以收到訂閱前一定數量的數據(通過 limit(int) 或 time(Duration) 配置)。

    • 多訂閱者支持:與 multicast() 類似,但增加了數據重放能力。

    • 內存管理:緩存數據量或時間窗口可配置,避免內存無限增長。

  • 適用場景

    • 需要新訂閱者獲取歷史數據的場景(如聊天記錄回放)。

    • 實時監(jiān)控面板(多個訂閱者需要看到完整上下文)。

三、Sinks.Many的使用

(1)添加事件

一旦我們創(chuàng)建了一個Sinks.Many對象,我們可以使用emitNext()方法來添加一個事件。這個方法接受一個參數,表示要添加的事件。

Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
sink.emitNext("Event 1");
sink.emitNext("Event 2");
sink.emitNext("Event 3");

我們首先創(chuàng)建了一個Sinks.Many對象,然后使用emitNext()方法添加了三個事件。

當然,如果我們選擇接受Flux流中的數據的時候,可以這樣添加數據

 //創(chuàng)建Flux流
        Flux<String> flux = Flux.<String>create(sink->{
            sink.next("你好");
            sink.complete();
        });

        //創(chuàng)建Sinks.Many處理流數據
        Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();

        //訂閱Flux流,并將數據交給sink處理(像做流數據的緩存,篩選流數據)
        flux.subscribe(
                sink::tryEmitNext,
                sink::tryEmitError,
                sink::tryEmitComplete
        );

(2)獲取流數據

我們可以使用Sinks.Many對象來獲取已添加的事件??梢允褂胊sFlux()方法將Sinks.Many對象轉換為一個Flux對象,然后使用Flux對象的方法來訂閱和處理事件。

 // 1. .unicast()使用創(chuàng)建單播 Sink
        Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();

        // 2. 將 Sink 轉換為 Flux(供訂閱者訂閱)
        Flux<String> flux = unicastSink.asFlux();

        // 3. 第一個訂閱者(合法)
        flux.subscribe(
                data -> System.out.println("訂閱者1收到數據: " + data),
                error -> System.err.println("訂閱者1發(fā)生錯誤: " + error),
                () -> System.out.println("訂閱者1完成")
        );

        // 4. 推送數據到 Sink
        unicastSink.tryEmitNext("數據1");
        unicastSink.tryEmitNext("數據2");

        // 5. 嘗試第二個訂閱者(會拋出 IllegalStateException)
        try {
            flux.subscribe(
                    data -> System.out.println("訂閱者2收到數據: " + data),
                    error -> System.err.println("訂閱者2發(fā)生錯誤: " + error),
                    () -> System.out.println("訂閱者2完成")
            );
        } catch (IllegalStateException e) {
            System.err.println("訂閱者2訂閱失敗: " + e.getMessage());
        }

        // 6. 關閉 Sink(發(fā)送完成信號)
        unicastSink.tryEmitComplete();

這里我使用了unicast()方法創(chuàng)建的Sinks.Many,這個時候我通過asFlux()方法轉換的flux流,只能被一個訂閱者訂閱到,第二個訂閱者,訂閱的時候就出報錯,當然,如果你想要多個訂閱者訂閱,可以使用multicast()或者replay()方式創(chuàng)建,

四、熱冷流

上面三種方式創(chuàng)建的是熱流。

熱流:數據獨立于訂閱者持續(xù)生成,多訂閱者共享實時數據,適用于實時事件推送。

在添加事件代碼中我也有使用到Flux.create()方法創(chuàng)建流,要注意這里創(chuàng)建的是冷流。

冷流:每次調用 subscribe() 時,會觸發(fā) Flux.create() 的回調函數(即 Consumer<SynchronousSink<T>>),??重新生成數據流??。多個訂閱者之間數據獨立。這個跟ThreadLocal有點像,F(xiàn)lux.create()會為每一個訂閱者創(chuàng)建單獨隔離的數據流,保證每一條流中數據互不影響。

冷熱流的選擇;

  • 若需要多個訂閱者共享實時數據 → 熱流。
  • 若需要每個訂閱者獨立消費完整數據 → 冷流。
  • 若需要歷史數據 → 使用 replay() 緩存。

總結 

到此這篇關于JAVA Reactor中Sinks.Many類三種常見的創(chuàng)建方式及使用的文章就介紹到這了,更多相關JAVA Reactor中Sinks.Many類內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • SpringBoot熱部署啟動關閉流程詳解

    SpringBoot熱部署啟動關閉流程詳解

    Spring?Boot啟動熱部署是一種技術,它能讓開發(fā)者在不重啟應用程序的情況下實時更新代碼。這樣可以提高開發(fā)效率,避免頻繁重啟應用,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習吧
    2023-04-04
  • JAXB簡介_動力節(jié)點Java學院整理

    JAXB簡介_動力節(jié)點Java學院整理

    這篇文章主要為大家詳細介紹了JAXB簡介的相關資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-08-08
  • java實現(xiàn)簡單注冊選擇所在城市

    java實現(xiàn)簡單注冊選擇所在城市

    這篇文章主要為大家詳細介紹了java實現(xiàn)簡單注冊選擇所在城市的相關代碼,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2016-04-04
  • Flink時間和窗口邏輯處理源碼分析

    Flink時間和窗口邏輯處理源碼分析

    這篇文章主要為大家介紹了Flink時間和窗口邏輯處理源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-12-12
  • 詳解java各種集合的線程安全

    詳解java各種集合的線程安全

    這篇文章主要介紹了詳解java各種集合的線程安全,小編覺得挺不錯的,這里分享給大家,供需要的朋友參考。
    2017-10-10
  • JavaSE文件操作工具類FileUtil詳解

    JavaSE文件操作工具類FileUtil詳解

    這篇文章主要為大家詳細介紹了JavaSE系列之文件操作工具類FileUtil,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-08-08
  • JavaWeb開發(fā)之模仿知乎首頁完整代碼

    JavaWeb開發(fā)之模仿知乎首頁完整代碼

    這篇文章主要介紹了JavaWeb開發(fā)之模仿知乎首頁完整代碼的相關資料,非常不錯,具有參考借鑒價值,需要的朋友可以參考下
    2016-11-11
  • SpringMVC核心DispatcherServlet處理流程分步詳解

    SpringMVC核心DispatcherServlet處理流程分步詳解

    這篇文章主要介紹了SpringMVC核心之中央調度器DispatcherServlet的相關知識,包括SpringMVC請求處理過程及SrpingMVC容器和spring IOC容器關系,需要的朋友可以參考下
    2023-04-04
  • mybatis3.4.6 批量更新 foreach 遍歷map 的正確姿勢詳解

    mybatis3.4.6 批量更新 foreach 遍歷map 的正確姿勢詳解

    這篇文章主要介紹了mybatis3.4.6 批量更新 foreach 遍歷map 的正確姿勢詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-11-11
  • 25行Java代碼將普通圖片轉換為字符畫圖片和文本的實現(xiàn)

    25行Java代碼將普通圖片轉換為字符畫圖片和文本的實現(xiàn)

    這篇文章主要介紹了25行Java代碼將普通圖片轉換為字符畫圖片和文本的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2021-04-04

最新評論