Spring Reactor基本介紹和案例解析
1. Reactor 對比
1.1 Reactor 線程模型
Reactor 線程模型就是通過 單個線程 使用 Java NIO 包中的 Selector 的 select()方法,進(jìn)行監(jiān)聽。當(dāng)獲取到事件(如 accept、read 等)后,就會分配(dispatch)事件進(jìn)行相應(yīng)的事件處理(handle)。
如果要給 Reactor 線程模型 下一個更明確的定義,應(yīng)該是:
Reactor線程模式 = Reactor(I/O多路復(fù)用)+ 線程池
Netty、Redis 使用了此模型,主要是解決 C10K 問題
C10K 問題:服務(wù)器如何支持 10K 個并發(fā)連接
1.2 Spring Reactor
Reactor 是 JVM 完全非阻塞的響應(yīng)式編程基礎(chǔ),響應(yīng)式編程是一種關(guān)注數(shù)據(jù)流和變化傳播的異步編程范式。這意味著可以通過所采用的編程語言輕松地表達(dá)靜態(tài)(例如數(shù)組)或動態(tài)(例如事件發(fā)射器)數(shù)據(jù)流。
Mono<List<String>> cartInfoMono = Mono.just( "songjiyang" ) .map( UserService::findUserByName ) .map( UserService::findUserShoppingCart ); String user = UserService.findUserByName( "songjiyang" ); List<String> userShoppingCart = UserService.findUserShoppingCart( user );
聯(lián)系:
兩者都是使用異步的手段來提高系統(tǒng)的性能
區(qū)別:
Reactor 模型主要異步的處理新連接、連接和讀寫,而 Spring Reactor 在更高的代碼級別提供了異步框架
或者反過來說,新連接、連接和讀寫等事件觸發(fā)了 Netty Reactor 的某些管道處理器流程,某些事件觸發(fā)了 Spring Reactor 的執(zhí)行流程,這也是 Reactor(反應(yīng)器)名稱的由來
2. Java 中的異步
上面我們一直在講異步,異步其實(shí)是針對調(diào)用者的,也就是調(diào)用者調(diào)用完方法之后就可以做的別的事情了,Java 中實(shí)現(xiàn)異步就兩種方式:
回調(diào)- 多線程
2.1 回調(diào)
回調(diào)其實(shí)就是把當(dāng)前的事情完成之后,后面需要做的事當(dāng)成函數(shù)傳進(jìn)行,等完成之后調(diào)用就行
public static void main( String[] args ){ doA( ( next ) -> { log.info( "doB" ); next.run(); }, () -> log.info( "doC" ) ); } public static void doA( Consumer<Runnable> next, Runnable nextNext ){ log.info( "doA" ); next.accept( nextNext ); } // output 15:06:52.818 [main] INFO concurrent.CompleteTest - doA 15:06:52.820 [main] INFO concurrent.CompleteTest - doB 15:06:52.820 [main] INFO concurrent.CompleteTest - doC
回調(diào)是在一個線程中來完成的,很容易理解,但問題是回調(diào)太多代碼就變的很復(fù)雜,有回調(diào)地域的問題
回調(diào)只是一種異步的編程方式,本身實(shí)現(xiàn)異步其實(shí)還是需要多線程,例如單獨(dú)起一個監(jiān)聽線程來執(zhí)行回調(diào)函數(shù),例如 EventListener
如果執(zhí)行的任務(wù)不考慮線程安全問題的話,可以使用 CompletableFuture 來解決,會更加易于閱讀
CompletableFuture .runAsync( ()-> log.info("doA") ) .thenRunAsync( ()-> log.info("doB") ) .thenRunAsync( ()->log.info("doC") ) .get(); // output 15:08:04.407 [ForkJoinPool.commonPool-worker-1] INFO concurrent.CompleteTest - doA 15:08:04.410 [ForkJoinPool.commonPool-worker-1] INFO concurrent.CompleteTest - doB 15:08:04.410 [ForkJoinPool.commonPool-worker-1] INFO concurrent.CompleteTest - doC
CompletableFuture 的 thenRunAsync 也是基于回調(diào),每個任務(wù) Class 會有一個 next, 多個任務(wù)組成一個回調(diào)鏈
Mono.just("") .doOnNext( (x)-> log.info("doA") ) .doOnNext( (x)-> log.info("doB") ) .doOnNext( (x)-> log.info("doC") ) .block(); 15:12:56.160 [main] INFO concurrent.CompleteTest - doA 15:12:56.160 [main] INFO concurrent.CompleteTest - doB 15:12:56.161 [main] INFO concurrent.CompleteTest - doC
2.2 多線程
多線程的方式,大家應(yīng)該都很熟悉
- Thread
- ExecutorService 線程池
- CompletionService 帶結(jié)果隊(duì)列的線程池
- CompletableFuture 用于任務(wù)編排
- Runable、Callable、Future、CompletableFuture
3. Spring Reactor
從上面可以看到一些使用 Reactor 的代碼中,都可以在原生 JDK 中找到替換,那我們?yōu)槭裁催€需要它呢?
- 可組合和可讀性
- 豐富的操作
- 訂閱之前什么都不會發(fā)生
- 背壓
下面是 Java9 中 Flow 類的類圖,SpringReactor 也是使用這四個類,在 Java9 中已經(jīng)成了規(guī)范
3.1 Publisher
Mono,提供 0 到 1 個 Item
Flux,提供 0 到 N 個 Item
發(fā)布者提供 n 個 Item, 經(jīng)過一些 operator(數(shù)據(jù)處理操作),完成或者異常中止
核心方法:
subscribe
3.1.1 創(chuàng)建
Mono<String> noData = Mono.empty(); Mono<String> data = Mono.just("foo"); Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); Mono.fromSupplier( ()->1 ); Mono.fromFuture( CompletableFuture.runAsync( ()-> {} ) ); Flux.create((sink)->{ for( int i = 0; i < 5; i++ ){ sink.next( i ) ; } sink.complete(); });
下面這些都稱為 operator,可以很靈活處理其中的 Item
- 轉(zhuǎn)化 map、flatMap、
- 消費(fèi) doOnNext、doNextError、doOnCancel
- 過濾 filter、distinct、take
- 錯誤處理 onErrorReturn、onErrorComplete、onErrorResume、doFinally
- 時間相關(guān) timeout、interval、delay
- 分隔 window、buffer
- 轉(zhuǎn)同步 block、toStream
3.1.3 訂閱
訂閱然后消費(fèi)發(fā)布者的內(nèi)容
subscribe(); subscribe(Consumer<? super T> consumer);
訂閱之后的返回值是Disposable****,可以使用這個對象來取消訂閱,會告訴發(fā)布者停止生產(chǎn)對象,但不保證會立即終止
- 當(dāng)然可以給 subscribe 傳遞參數(shù),自定義 complete 或者 error 時需要做的時
- 同時可以使用 BaseSubscriber 類來實(shí)現(xiàn)訂閱,可以控制消費(fèi)的數(shù)量
3.2 Subscriber
消費(fèi)者一般不用手動創(chuàng)建,通過 subscribe 傳進(jìn) Consumer 函數(shù)后,會自動生成一個 LambdaSubscriber,核心方法:
- onSubscribe
- onNext
- onError
- onComplete
3.3 Processor
既是發(fā)布者,又是訂閱者
3.4 Subscription
訂閱,消費(fèi)者調(diào)用 subscribe 方法之后可以在 onSubscribe 回調(diào)中獲取,可以請求下一個 Item 或者取消訂閱
- request
- cancel
3.5 Thread 和 Scheduler
沒有指定的情況下:
- 當(dāng)前的 operator 使用上一個 operator 的線程,最先的 operator 使用調(diào)用 subscribe 的線程來執(zhí)行
Reactor 中使用 Scheduler 來執(zhí)行流程,類似 ExecutorService
- subscribeOn 可以指定訂閱時使用的線程,這樣可以不阻塞的訂閱
- publishOn 指定發(fā)布時使用的線程
4. Spring Reactor 優(yōu)化案例
流程中可以優(yōu)化的點(diǎn):
- 準(zhǔn)備數(shù)據(jù)可以異步,等需要用的時候在去阻塞獲取,相當(dāng)于一個 Future
- 召回可以完成之后就去等正排數(shù)據(jù),新的問題,如何去重?本來拿一次正排數(shù)據(jù),現(xiàn)在拿 N 個召回次數(shù)據(jù),請求量是不是會變大,耗時是不是也會增加
- 過濾的準(zhǔn)備數(shù)據(jù)也可以異步,也就是說某個過濾策略的數(shù)據(jù)準(zhǔn)備好了,就可以去執(zhí)行過濾了,而且還存在很多不需要依賴數(shù)據(jù)的過濾策略也需要等
- 一般粗排只需要 1000 條數(shù)據(jù),過濾時已經(jīng)拿夠了 1000 條就可以跳過了
我們上面所說的異步,其實(shí)就是說流程中某些節(jié)點(diǎn)是在同時執(zhí)行的,不必等一個節(jié)點(diǎn)完成后再執(zhí)行另外一個,這其實(shí)一個統(tǒng)籌學(xué)的問題
4.1 解決方法對比
問題 | Java 原生 | Reactor |
---|---|---|
準(zhǔn)備數(shù)據(jù)異步 | Future,缺點(diǎn):1. 需要調(diào)用方處理異常 2. 不能編排后續(xù)流程,eg: 拿完企業(yè)信息后繼續(xù)拿企業(yè)治理信息,F(xiàn)uture 需要 get 阻塞 | Mono, 使用 onErrorResume 處理異常,使用 map 編排后續(xù)流程 |
召回完成拿正排 | 需要一個阻塞隊(duì)列,召回把結(jié)果往里面 push,另外一個線程從隊(duì)列里面拿同時去取正排數(shù)據(jù),需要自己維護(hù) map 來去重,需要循環(huán)等待到達(dá)批次后去取正排 | Flux,召回使用 sink.next 把結(jié)果放進(jìn)去合并節(jié)點(diǎn)訂閱,使用 distinct 來去重,使用 buffer 來實(shí)現(xiàn)批次數(shù)據(jù) |
過濾準(zhǔn)備數(shù)據(jù)異步 | 需要阻塞隊(duì)列 | Flux, 在依賴任務(wù)中把準(zhǔn)備好的過濾策略放進(jìn)去,過濾節(jié)點(diǎn)訂閱 Flux 并過濾 |
粗排取 1000 條 | 異步執(zhí)行過濾,把過濾結(jié)果放到一個容器中,粗排節(jié)點(diǎn)不斷查看這個容器的結(jié)果是否夠 1000 條,夠了就可以執(zhí)行粗排了 | Flux, 使用 take(1000) |
for (StrategyConfig filterConfig : filterConfigList) { doStrategyFilter(filterChainContext, recommendContext, recRequest, filterConfig, allFilters, partitionContext, partitionTrace); } readyStrategyFlux.publishOn(ExecutorServiceHolder.scheduler).doOnNext((readyStrategyName) -> { try { List<StrategyConfig> strategyConfigs = strategyNameToConfigs.get(readyStrategyName); for (StrategyConfig strategyConfig : strategyConfigs) { doStrategyFilter(filterChainContext, recommendContext, recRequest, strategyConfig, allFilters, partitionContext, partitionTrace); } } catch (Exception e) { LOGGER.error("doOnNext filter error", e); } }).blockLast();
這里的 blockLast 又回到了同步世界,可以很好的和已有的代碼兼容
下面是 20240629 到 20240702 某個場景優(yōu)化過濾階段的耗時對比
pv | qps | tp99 | avg | |
---|---|---|---|---|
實(shí)驗(yàn)組 | 4051865 | 46.90 | 369.00 | 230.88 |
對照組 | 4054074 | 46.92 | 397.00 | 251.55 |
業(yè)務(wù)指標(biāo)對比
無明顯波動
5. 總結(jié)
Spring Reactor 是一個響應(yīng)式編程框架,非常適合類似 MXN 這樣的流程編排系統(tǒng),也是 Java 中異步編程的一種補(bǔ)充,但也會有一些其他的問題,例如潛在的線程安全問題,已有框架的沖突 ThreadLocal 等
參考資料
【1】深入 Netty 邏輯架構(gòu),從 Reactor 線程模型開始(一)-阿里云開發(fā)者社區(qū)
到此這篇關(guān)于Spring Reactor基本介紹和案例的文章就介紹到這了,更多相關(guān)Spring Reactor內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot實(shí)現(xiàn)在webapp下直接訪問html,jsp
這篇文章主要介紹了SpringBoot實(shí)現(xiàn)在webapp下直接訪問html,jsp問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-10-10mybatis實(shí)現(xiàn)獲取入?yún)⑹荓ist和Map的取值
這篇文章主要介紹了mybatis實(shí)現(xiàn)獲取入?yún)⑹荓ist和Map的取值問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-06-06JavaEE中用response向客戶端輸出中文數(shù)據(jù)亂碼問題分析
這篇文章主要介紹了JavaEE中用response向客戶端輸出中文數(shù)據(jù)亂碼問題分析,需要的朋友可以參考下2014-10-10MyBatis結(jié)果映射(ResultMap)的使用
在MyBatis中,結(jié)果映射是實(shí)現(xiàn)數(shù)據(jù)庫結(jié)果集到Java對象映射的核心,它不僅支持簡單的字段映射,還能處理字段名不一致、嵌套對象和集合映射等復(fù)雜場景,通過ResultMap,開發(fā)者可以靈活定義映射關(guān)系,以適應(yīng)各種需求,感興趣的可以了解一下2024-09-09springboot使用logback自定義日志的詳細(xì)過程
這篇文章主要介紹了springboot使用logback自定義日志的詳細(xì)過程,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),感興趣的朋友一起看看吧2024-12-12Java 獲取指定日期的實(shí)現(xiàn)方法總結(jié)
以下是對Java中獲取指定日期的實(shí)現(xiàn)方法進(jìn)行了歸納總結(jié),需要的朋友可以參考下2013-07-07關(guān)于HashMap的put方法執(zhí)行全過程
這篇文章主要介紹了關(guān)于HashMap的put方法執(zhí)行全過程,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-06-06SpringBoot Maven打包失敗報:class lombok.javac.apt.Lombo
最新項(xiàng)目部署的時候,出現(xiàn)了一個maven打包失敗的問題,報:class lombok.javac.apt.LombokProcessor錯誤,所以本文給大家介紹了如何解決SpringBoot Maven 打包失敗:class lombok.javac.apt.LombokProcessor 錯誤,需要的朋友可以參考下2023-12-12