欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring Reactor基本介紹和案例解析

 更新時間:2024年07月05日 09:13:54   作者:songtianer  
Spring Reactor 是一個響應(yīng)式編程框架,非常適合類似 MXN 這樣的流程編排系統(tǒng),也是 Java 中異步編程的一種補(bǔ)充,這篇文章主要介紹了Spring Reactor基本介紹和案例解析,需要的朋友可以參考下

1. Reactor 對比

1.1 Reactor 線程模型

Reactor 線程模型就是通過 單個線程 使用 Java NIO 包中的 Selector 的 select()方法,進(jìn)行監(jiān)聽。當(dāng)獲取到事件(如 accept、read 等)后,就會分配(dispatch)事件進(jìn)行相應(yīng)的事件處理(handle)。

如果要給 Reactor 線程模型 下一個更明確的定義,應(yīng)該是:

Reactor線程模式 = Reactor(I/O多路復(fù)用)+ 線程池

Netty、Redis 使用了此模型,主要是解決 C10K 問題

C10K 問題:服務(wù)器如何支持 10K 個并發(fā)連接

1.2 Spring Reactor

Reactor 是 JVM 完全非阻塞的響應(yīng)式編程基礎(chǔ),響應(yīng)式編程是一種關(guān)注數(shù)據(jù)流和變化傳播的異步編程范式。這意味著可以通過所采用的編程語言輕松地表達(dá)靜態(tài)(例如數(shù)組)或動態(tài)(例如事件發(fā)射器)數(shù)據(jù)流。

Mono<List<String>> cartInfoMono = Mono.just( "songjiyang" )
        .map( UserService::findUserByName )
        .map( UserService::findUserShoppingCart );
String user = UserService.findUserByName( "songjiyang" );
List<String> userShoppingCart = UserService.findUserShoppingCart( user );

聯(lián)系:

兩者都是使用異步的手段來提高系統(tǒng)的性能

區(qū)別:

Reactor 模型主要異步的處理新連接、連接和讀寫,而 Spring Reactor 在更高的代碼級別提供了異步框架

或者反過來說,新連接、連接和讀寫等事件觸發(fā)了 Netty Reactor 的某些管道處理器流程,某些事件觸發(fā)了 Spring Reactor 的執(zhí)行流程,這也是 Reactor(反應(yīng)器)名稱的由來

2. Java 中的異步

上面我們一直在講異步,異步其實(shí)是針對調(diào)用者的,也就是調(diào)用者調(diào)用完方法之后就可以做的別的事情了,Java 中實(shí)現(xiàn)異步就兩種方式:

  • 回調(diào)
  • 多線程

2.1 回調(diào)

回調(diào)其實(shí)就是把當(dāng)前的事情完成之后,后面需要做的事當(dāng)成函數(shù)傳進(jìn)行,等完成之后調(diào)用就行

    public static void main( String[] args ){
       doA( ( next ) -> {
          log.info( "doB" );
          next.run();
       }, () -> log.info( "doC" ) );
    }
    public static void doA( Consumer<Runnable> next, Runnable nextNext ){
       log.info( "doA" );
       next.accept( nextNext );
    }
// output
15:06:52.818 [main] INFO concurrent.CompleteTest - doA
15:06:52.820 [main] INFO concurrent.CompleteTest - doB
15:06:52.820 [main] INFO concurrent.CompleteTest - doC

回調(diào)是在一個線程中來完成的,很容易理解,但問題是回調(diào)太多代碼就變的很復(fù)雜,有回調(diào)地域的問題

回調(diào)只是一種異步的編程方式,本身實(shí)現(xiàn)異步其實(shí)還是需要多線程,例如單獨(dú)起一個監(jiān)聽線程來執(zhí)行回調(diào)函數(shù),例如 EventListener

如果執(zhí)行的任務(wù)不考慮線程安全問題的話,可以使用 CompletableFuture 來解決,會更加易于閱讀

CompletableFuture
       .runAsync( ()-> log.info("doA") )
       .thenRunAsync( ()-> log.info("doB") )
       .thenRunAsync( ()->log.info("doC") )
       .get();
// output
15:08:04.407 [ForkJoinPool.commonPool-worker-1] INFO concurrent.CompleteTest - doA
15:08:04.410 [ForkJoinPool.commonPool-worker-1] INFO concurrent.CompleteTest - doB
15:08:04.410 [ForkJoinPool.commonPool-worker-1] INFO concurrent.CompleteTest - doC

CompletableFuture 的 thenRunAsync 也是基于回調(diào),每個任務(wù) Class 會有一個 next, 多個任務(wù)組成一個回調(diào)鏈

Mono.just("")
       .doOnNext( (x)-> log.info("doA") )
       .doOnNext( (x)-> log.info("doB") )
       .doOnNext( (x)-> log.info("doC") )
       .block();
15:12:56.160 [main] INFO concurrent.CompleteTest - doA
15:12:56.160 [main] INFO concurrent.CompleteTest - doB
15:12:56.161 [main] INFO concurrent.CompleteTest - doC

2.2 多線程

多線程的方式,大家應(yīng)該都很熟悉

  • Thread
  • ExecutorService 線程池
  • CompletionService 帶結(jié)果隊(duì)列的線程池
  • CompletableFuture 用于任務(wù)編排
  • Runable、Callable、Future、CompletableFuture

3. Spring Reactor

從上面可以看到一些使用 Reactor 的代碼中,都可以在原生 JDK 中找到替換,那我們?yōu)槭裁催€需要它呢?

  • 可組合和可讀性
  • 豐富的操作
  • 訂閱之前什么都不會發(fā)生
  • 背壓

下面是 Java9 中 Flow 類的類圖,SpringReactor 也是使用這四個類,在 Java9 中已經(jīng)成了規(guī)范

3.1 Publisher

Mono,提供 0 到 1 個 Item

Flux,提供 0 到 N 個 Item

發(fā)布者提供 n 個 Item, 經(jīng)過一些 operator(數(shù)據(jù)處理操作),完成或者異常中止

核心方法:

subscribe

3.1.1 創(chuàng)建

Mono<String> noData = Mono.empty(); 
Mono<String> data = Mono.just("foo");
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); 
Mono.fromSupplier( ()->1 );
Mono.fromFuture( CompletableFuture.runAsync( ()-> {} ) );
Flux.create((sink)->{
    for( int i = 0; i < 5; i++ ){
       sink.next( i ) ;
    }
    sink.complete();
});

下面這些都稱為 operator,可以很靈活處理其中的 Item

  • 轉(zhuǎn)化 map、flatMap、
  • 消費(fèi) doOnNext、doNextError、doOnCancel
  • 過濾 filter、distinct、take
  • 錯誤處理 onErrorReturn、onErrorComplete、onErrorResume、doFinally
  • 時間相關(guān) timeout、interval、delay
  • 分隔 window、buffer
  • 轉(zhuǎn)同步 block、toStream

3.1.3 訂閱

訂閱然后消費(fèi)發(fā)布者的內(nèi)容

subscribe(); 
subscribe(Consumer<? super T> consumer); 

訂閱之后的返回值是Disposable****,可以使用這個對象來取消訂閱,會告訴發(fā)布者停止生產(chǎn)對象,但不保證會立即終止

  • 當(dāng)然可以給 subscribe 傳遞參數(shù),自定義 complete 或者 error 時需要做的時
  • 同時可以使用 BaseSubscriber 類來實(shí)現(xiàn)訂閱,可以控制消費(fèi)的數(shù)量

3.2 Subscriber

消費(fèi)者一般不用手動創(chuàng)建,通過 subscribe 傳進(jìn) Consumer 函數(shù)后,會自動生成一個 LambdaSubscriber,核心方法:

  • onSubscribe
  • onNext
  • onError
  • onComplete

3.3 Processor

既是發(fā)布者,又是訂閱者

3.4 Subscription

訂閱,消費(fèi)者調(diào)用 subscribe 方法之后可以在 onSubscribe 回調(diào)中獲取,可以請求下一個 Item 或者取消訂閱

  • request
  • cancel

3.5 Thread 和 Scheduler

沒有指定的情況下:

  • 當(dāng)前的 operator 使用上一個 operator 的線程,最先的 operator 使用調(diào)用 subscribe 的線程來執(zhí)行

Reactor 中使用 Scheduler 來執(zhí)行流程,類似 ExecutorService

  • subscribeOn 可以指定訂閱時使用的線程,這樣可以不阻塞的訂閱
  • publishOn 指定發(fā)布時使用的線程

4. Spring Reactor 優(yōu)化案例

流程中可以優(yōu)化的點(diǎn):

  • 準(zhǔn)備數(shù)據(jù)可以異步,等需要用的時候在去阻塞獲取,相當(dāng)于一個 Future
  • 召回可以完成之后就去等正排數(shù)據(jù),新的問題,如何去重?本來拿一次正排數(shù)據(jù),現(xiàn)在拿 N 個召回次數(shù)據(jù),請求量是不是會變大,耗時是不是也會增加
  • 過濾的準(zhǔn)備數(shù)據(jù)也可以異步,也就是說某個過濾策略的數(shù)據(jù)準(zhǔn)備好了,就可以去執(zhí)行過濾了,而且還存在很多不需要依賴數(shù)據(jù)的過濾策略也需要等
  • 一般粗排只需要 1000 條數(shù)據(jù),過濾時已經(jīng)拿夠了 1000 條就可以跳過了

我們上面所說的異步,其實(shí)就是說流程中某些節(jié)點(diǎn)是在同時執(zhí)行的,不必等一個節(jié)點(diǎn)完成后再執(zhí)行另外一個,這其實(shí)一個統(tǒng)籌學(xué)的問題

4.1 解決方法對比

問題Java 原生Reactor
準(zhǔn)備數(shù)據(jù)異步Future,缺點(diǎn):1. 需要調(diào)用方處理異常 2. 不能編排后續(xù)流程,eg: 拿完企業(yè)信息后繼續(xù)拿企業(yè)治理信息,F(xiàn)uture 需要 get 阻塞Mono, 使用 onErrorResume 處理異常,使用 map 編排后續(xù)流程
召回完成拿正排需要一個阻塞隊(duì)列,召回把結(jié)果往里面 push,另外一個線程從隊(duì)列里面拿同時去取正排數(shù)據(jù),需要自己維護(hù) map 來去重,需要循環(huán)等待到達(dá)批次后去取正排Flux,召回使用 sink.next 把結(jié)果放進(jìn)去合并節(jié)點(diǎn)訂閱,使用 distinct 來去重,使用 buffer 來實(shí)現(xiàn)批次數(shù)據(jù)
過濾準(zhǔn)備數(shù)據(jù)異步需要阻塞隊(duì)列Flux, 在依賴任務(wù)中把準(zhǔn)備好的過濾策略放進(jìn)去,過濾節(jié)點(diǎn)訂閱 Flux 并過濾
粗排取 1000 條異步執(zhí)行過濾,把過濾結(jié)果放到一個容器中,粗排節(jié)點(diǎn)不斷查看這個容器的結(jié)果是否夠 1000 條,夠了就可以執(zhí)行粗排了Flux, 使用 take(1000)
for (StrategyConfig filterConfig : filterConfigList) {
    doStrategyFilter(filterChainContext, recommendContext, recRequest, filterConfig, allFilters, partitionContext, partitionTrace);
}
readyStrategyFlux.publishOn(ExecutorServiceHolder.scheduler).doOnNext((readyStrategyName) -> {
    try {
        List<StrategyConfig> strategyConfigs = strategyNameToConfigs.get(readyStrategyName);
        for (StrategyConfig strategyConfig : strategyConfigs) {
            doStrategyFilter(filterChainContext, recommendContext, recRequest, strategyConfig, allFilters, partitionContext, partitionTrace);
        }
    } catch (Exception e) {
        LOGGER.error("doOnNext filter error", e);
    }
}).blockLast();

這里的 blockLast 又回到了同步世界,可以很好的和已有的代碼兼容

下面是 20240629 到 20240702 某個場景優(yōu)化過濾階段的耗時對比

pvqpstp99avg
實(shí)驗(yàn)組405186546.90369.00230.88
對照組405407446.92397.00251.55

業(yè)務(wù)指標(biāo)對比

無明顯波動

5. 總結(jié)

Spring Reactor 是一個響應(yīng)式編程框架,非常適合類似 MXN 這樣的流程編排系統(tǒng),也是 Java 中異步編程的一種補(bǔ)充,但也會有一些其他的問題,例如潛在的線程安全問題,已有框架的沖突 ThreadLocal 等

參考資料

【1】深入 Netty 邏輯架構(gòu),從 Reactor 線程模型開始(一)-阿里云開發(fā)者社區(qū)

【2】Reactor 3 Reference Guide

【3】C10k 問題簡述-CSDN 博客

到此這篇關(guān)于Spring Reactor基本介紹和案例的文章就介紹到這了,更多相關(guān)Spring Reactor內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論