Project?Reactor?響應式范式編程
什么是響應式編程?
響應式編程是一種編程范式,它關注數(shù)據(jù)的變化和傳播,而不是控制流。響應式編程可以提高程序的性能、彈性和可伸縮性,使程序能夠及時響應用戶的需求和環(huán)境的變化。在本文中,我們將介紹Java中的響應式編程的基本概念、原理和實踐。
響應式編程的核心思想是將數(shù)據(jù)和行為抽象為流(Stream),流可以表示任何異步的事件或值,比如用戶輸入、網絡請求、數(shù)據(jù)庫查詢等。流可以被觀察(Observable),也就是說,可以有一個或多個觀察者(Observer)訂閱流,并在流發(fā)生變化時接收通知。流還可以被操作(Operator),也就是說,可以對流進行各種轉換、過濾、組合等操作,從而生成新的流。
響應式編程的優(yōu)勢:
- 可以將復雜的異步邏輯簡化為聲明式的數(shù)據(jù)流操作,避免了回調地獄(Callback Hell)、阻塞線程和競態(tài)條件等問題。
- 可以提高程序的性能和資源利用率,通過減少線程的上下文切換和阻塞,以及利用反應流的背壓(Backpressure)機制來控制數(shù)據(jù)流的速度,也就是說,可以讓下游的觀察者控制上游的數(shù)據(jù)源的發(fā)送速率,從而防止數(shù)據(jù)溢出或浪費。
- 可以提高程序的表達力和靈活性,通過使用函數(shù)式編程的風格和操作符來組合和轉換數(shù)據(jù)流,以及利用反應式編程框架和庫來簡化異步和事件驅動編程的復雜度
響應式編程的缺點:
- 降低程序的可讀性和維護性,通過使用嵌套、回調、訂閱等方式來處理異步事件,以及使用反應流的操作符來處理數(shù)據(jù)流,可能導致代碼難以理解和調試。
Java中的響應式編程
Java中有多種框架和庫可以實現(xiàn)響應式編程,比如RxJava、Spring Reactor、Vert.x等。這些框架和庫都遵循了Reactive Streams規(guī)范,這是一套定義了非阻塞背壓的異步流處理標準的接口。
Reactive Streams規(guī)范主要包括四個接口:
- Publisher:發(fā)布者,表示一個數(shù)據(jù)源,可以發(fā)出零個或多個數(shù)據(jù),并通知訂閱者完成或出錯。
- Subscriber:訂閱者,表示一個數(shù)據(jù)消費者,可以訂閱一個發(fā)布者,并在收到數(shù)據(jù)、完成或出錯時做出相應的動作。
- Subscription:訂閱,表示發(fā)布者和訂閱者之間的關系,可以用來請求或取消數(shù)據(jù)。
- Processor:處理器,表示一個既是發(fā)布者又是訂閱者的中間組件,可以對數(shù)據(jù)進行處理或轉換。
Project Reactor
Project Reactor是一個完全非阻塞的包含背壓支持的響應式編程基石。它是Spring生態(tài)系統(tǒng)中Spring Reactive的基礎,被用于如Spring WebFlux, Spring Data和Spring Cloud Gateway等項目中。
基本概念
Project Reactor的核心思想是將數(shù)據(jù)和事件看作是流(stream),流可以被創(chuàng)建,轉換,過濾,合并,分組,緩沖,錯誤處理等等。流是惰性的,只有當有訂閱者(subscriber)訂閱時才會開始發(fā)射數(shù)據(jù)或事件。流可以是有限的,也可以是無限的,可以是同步的,也可以是異步的,可以是單線程的,也可以是多線程的。流還可以支持背壓(backpressure),即訂閱者可以控制流的速度,避免被過多的數(shù)據(jù)或事件淹沒。
核心組件
Project Reactor提供了兩個主要的接口來表示流:
- Flux: 表示一個包含0到N個元素的流
- Mono: 表示一個包含0到1個元素的流。
它們都是Publisher<T>的實現(xiàn),可以發(fā)出0-N個元素的異步序列,并根據(jù)訂閱者的需求推送元素。
Flux表示的是包含0到N個元素的異步序列,可以被onComplete信號或者onError信號所終止。
Mono表示的是包含0或1個元素的異步序列,也可以被onComplete信號或者onError信號所終止。
Flux和Mono之間可以進行轉換。
代碼示例
Mono
// 創(chuàng)建一個Mono對象,包含一個字符串元素
Mono<String> mono = Mono.just("Hello World");
// 訂閱這個Mono對象,并打印元素值
mono.subscribe(System.out::println);
使用Mono.just方法創(chuàng)建了一個包含一個字符串元素的Mono對象,然后使用subscribe方法訂閱了這個對象,并提供了一個回調函數(shù)來打印元素值。當Mono對象發(fā)出元素值時,回調函數(shù)就會被調用。
Mono to Flux
把Mono轉換成Flux的一種方法是使用flux()方法,它會返回一個包含Mono發(fā)出的元素的Flux,或者如果Mono為空,則返回一個空的Flux。例如:
// 創(chuàng)建一個Mono對象,包含一個整數(shù)元素 Mono<Integer> mono = Mono.just(1); // 使用flux()方法把Mono轉換成Flux Flux<Integer> flux = mono.flux(); // 訂閱這個Flux對象,并打印元素值 flux.subscribe(System.out::println); // 輸出1
另一種方法是使用concatWith()方法,它會將Mono與另一個Publisher連接起來,形成一個Flux。例如:
// 創(chuàng)建一個Mono對象,包含一個整數(shù)元素 Mono<Integer> mono = Mono.just(1); // 創(chuàng)建一個Flux對象,包含兩個整數(shù)元素 Flux<Integer> flux = Flux.just(2, 3); // 使用concatWith()方法把Mono和Flux連接起來 Flux<Integer> result = mono.concatWith(flux); // 訂閱這個Flux對象,并打印元素值 result.subscribe(System.out::println); // 輸出1, 2, 3
Mono常用的操作
// 從一個固定的值創(chuàng)建一個Mono
Mono.just("Hello").subscribe(System.out::println); // 輸出Hello
// 從一個Callable對象創(chuàng)建一個Mono
Callable<String> callable = () -> "World";
Mono.fromCallable(callable).subscribe(System.out::println); // 輸出World
// 從一個Supplier對象創(chuàng)建一個Mono
Supplier<String> supplier = () -> "Supplier!";
Mono.fromSupplier(supplier).subscribe(System.out::println); // 輸出Supplier!
// 對Mono發(fā)出的元素進行映射操作
Mono.just("Hello").map(s -> s + " World").subscribe(System.out::println); // 輸出Hello World
// 對Mono發(fā)出的元素進行扁平化操作
Mono.just("Hello")
.flatMap(s -> Mono.just(s + " World"))
.subscribe(System.out::println); // 輸出Hello World
// 對Mono發(fā)出的元素進行過濾操作
Mono.just(1).filter(i -> i > 0).subscribe(System.out::println); // 輸出1
// 將多個Mono合并為一個Mono
Mono.zip(Mono.just("Hello"), Mono.just("World"))
.subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2())); // 輸出Hello World
// 將多個Mono合并為一個Flux
Mono.just("Hello")
.mergeWith(Mono.just("World"))
.subscribe(System.out::println); // 輸出Hello World
// 在這個Mono完成后,繼續(xù)處理另一個發(fā)布者
Mono.just("Hello").then(Mono.just("World")).subscribe(System.out::println); // 輸出World
// 在這個Mono發(fā)出元素時,執(zhí)行一個副作用操作
Mono.just("Hello")
.doOnNext(s -> System.out.println("Before: " + s))
.map(s -> s + " World")
.doOnNext(s -> System.out.println("After: " + s))
.subscribe();
// 輸出
// Before: Hello
// After: Hello World
Flux
// 創(chuàng)建一個Flux對象,包含三個整數(shù)元素 Flux<Integer> flux = Flux.just(1, 2, 3); // 訂閱這個Flux對象,并打印元素值 flux.subscribe(System.out::println);
使用Flux.just方法創(chuàng)建了一個包含三個整數(shù)元素的Flux對象,然后使用subscribe方法訂閱了這個對象,并提供了一個回調函數(shù)來打印元素值。當Flux對象發(fā)出元素值時,回調函數(shù)就會被調用。
Flux to Mono
把Flux轉換成Mono的一種方法是使用next()方法,它會返回Flux發(fā)出的第一個元素,或者如果Flux為空,則返回一個空的Mono。
例如:
// 創(chuàng)建一個Flux對象,包含三個整數(shù)元素 Flux<Integer> flux = Flux.just(1, 2, 3); // 使用next()方法把Flux轉換成Mono Mono<Integer> mono = flux.next(); // 訂閱這個Mono對象,并打印元素值 mono.subscribe(System.out::println); // 輸出1
另一種方法是使用collectList()方法,它會把Flux發(fā)出的所有元素收集到一個列表中,并返回一個包含這個列表的Mono。
例如:
// 創(chuàng)建一個Flux對象,包含三個整數(shù)元素 Flux<Integer> flux = Flux.just(1, 2, 3); // 使用collectList()方法把Flux轉換成Mono Mono<List<Integer>> mono = flux.collectList(); // 訂閱這個Mono對象,并打印元素值 mono.subscribe(System.out::println); // 輸出[1, 2, 3]
Flux常用的操作
// 從多個固定的值創(chuàng)建一個Flux
Flux.just("Hello", "World").subscribe(System.out::println); // 輸出Hello World
// 從一個數(shù)組對象創(chuàng)建一個Flux
String[] array = {"Hello", "World"};
Flux.fromArray(array).subscribe(System.out::println); // 輸出Hello World
// 從一個Iterable對象創(chuàng)建一個Flux
List<String> list = Arrays.asList("Hello", "World");
Flux.fromIterable(list).subscribe(System.out::println); // 輸出Hello World
// 從一個Stream對象創(chuàng)建一個Flux
Stream<String> stream = Stream.of("Hello", "World");
Flux.fromStream(stream).subscribe(System.out::println); // 輸出Hello World
// 創(chuàng)建一個包含指定范圍內整數(shù)的Flux
Flux.range(1, 5).subscribe(System.out::println); // 輸出1 2 3 4 5
// 創(chuàng)建一個按照指定時間間隔從0整數(shù)遞增的Flux
Duration duration = Duration.ofSeconds(1);
Flux<Long> interval = Flux.interval(duration);
interval.subscribe(System.out::println);
// 使用blockLast阻塞主線程,防止程序立即退出
interval.blockLast();
// 輸出結果每秒打印一次
// 0
// 1
// 2
// 3
// 4
// ...
// 對Flux發(fā)出的每個元素進行映射操作
Flux.just("Hello", "World").map(s -> s + "!")
.subscribe(System.out::println); // 輸出Hello! World!
// 對Flux發(fā)出的每個元素進行扁平化操作
Flux.just("Hello", "World")
.flatMap(s -> Flux.just(s + "!"))
.subscribe(System.out::println); //輸出Hello! World!
// 對Flux發(fā)出的每個元素進行過濾操作
Flux.range(1, 5).filter(i -> i % 2 == 0).subscribe(System.out::println);
// 輸出2 4
// 將多個Flux合并為一個Flux
Flux.zip(Flux.just("Hello"), Flux.just("World"))
.subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2()));
// 輸出Hello World
// 將多個Flux合并為一個Flux
Flux.just("Hello").mergeWith(Flux.just("World")).subscribe(System.out::println);
// 輸出Hello World
// 將多個Flux合并為一個Flux
Flux.just("Hello").concatWith(Flux.just("World")).subscribe(System.out::println);
// 輸出Hello World
// 將所有元素收集到一個List中
Flux.just("Hello", "World").collectList().subscribe(list -> System.out.println(list)); // 輸出[Hello, World]
Flux的zip、mergeWith、concatWith區(qū)別
zip、mergeWith和concatWith都是用來將多個Flux合并為一個Flux的操作,但是它們有一些區(qū)別:
zip會將多個Flux的元素按照一對一的方式進行合并,形成一個包含元組的Flux,每個元組中包含了每個源Flux的一個元素。如果源Flux的元素個數(shù)不一致,那么zip會以最短的Flux為基準,多余的元素會被丟棄。
Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<Integer> flux2 = Flux.just(1, 2, 3, 4);
Flux<Tuple2<String, Integer>> flux3 = Flux.zip(flux1, flux2);
flux3.subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2()));
// 輸出A 1 B 2 C 3
// 4不會輸出,因為最短的Flux是flux1,長度是3
mergeWith會將多個Flux的元素按照時間順序進行合并,形成一個包含所有元素的Flux。如果源Flux的元素有重疊,那么mergeWith會保留所有的元素。
Duration duration1 = Duration.ofMillis(100); Duration duration2 = Duration.ofMillis(200); Flux<String> flux1 = Flux.interval(duration1).map(i -> "A" + i); Flux<String> flux2 = Flux.interval(duration2).map(i -> "B" + i); Flux<String> flux = flux1.mergeWith(flux2); flux.subscribe(System.out::println); // 輸出A0 B0 A1 B1 A2 A3 B2 A4 B3 A5 ...
concatWith會將多個Flux的元素按照訂閱順序進行合并,形成一個包含所有元素的Flux。如果源Flux的元素有重疊,那么concatWith會保留所有的元素。concatWith會等待上一個源Flux完成后才訂閱下一個源Flux。
Duration duration1 = Duration.ofMillis(100); Duration duration2 = Duration.ofMillis(200); // 每100ms遞增1,打印5次結束 Flux<String> flux1 = Flux.interval(duration1).map(i -> "A" + i).take(5); Flux<String> flux2 = Flux.interval(duration2).map(i -> "B" + i).take(5); Flux<String> flux3 = flux1.concatWith(flux2); flux3.subscribe(System.out::println); // 避免程序立即退出 flux3.blockLast(); // 輸出 // A0 // A1 // A2 // A3 // A4 // B0 // B1 // B2 // B3 // B4
可以看到,Project Reactor和Java 8 Stream的用法看起來很像,因為它們都提供了一些函數(shù)式編程的方法,用來對數(shù)據(jù)流進行操作,例如map、filter、reduce等。但是它們的本質是不同的,主要有以下幾個區(qū)別:
- Project Reactor是基于Reactive Streams規(guī)范的一個實現(xiàn),它支持異步、非阻塞、反應式的編程模式,而Java 8 Stream是基于集合類的一個擴展,它支持同步、阻塞、命令式的編程模式。
- Project Reactor是基于Push模式的,它可以讓數(shù)據(jù)源主動推送數(shù)據(jù)給訂閱者,并且支持背壓機制,讓訂閱者可以控制數(shù)據(jù)的流速,而Java 8 Stream是基于Pull模式的,它需要訂閱者主動拉取數(shù)據(jù)源的數(shù)據(jù),并且沒有背壓機制,可能會導致內存溢出或者性能下降。
- Project Reactor可以處理無限流或者有限流,它可以通過短路操作來終止無限流,而Java 8 Stream只能處理有限流,它不能處理無限流或者異步流。
- Project Reactor可以在多線程或者單線程環(huán)境下運行,它可以通過parallel或者sequential方法來切換并行或者串行模式,而Java 8 Stream只能在單線程環(huán)境下運行,它只能通過parallelStream方法來創(chuàng)建并行流。
本文只介紹了Project Reactor的基本概念和用法,包括Flux和Mono的創(chuàng)建、轉換、過濾、合并等操作。如果你想深入學習Project Reactor,你可以參考官方文檔或者其他相關的資料,更多關于Project Reactor 響應式編程的資料請關注腳本之家其它相關文章!
相關文章
詳解Idea 2019.2 安裝lombok插件失效問題解決
這篇文章主要介紹了詳解Idea 2019.2 安裝lombok插件失效問題解決,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-10-10
SpringBoot實現(xiàn)接口校驗簽名調用的項目實踐
在以SpringBoot開發(fā)后臺API接口時,會存在哪些接口不安全的因素呢?通常如何去解決的呢?本文主要介紹了SpringBoot實現(xiàn)接口校驗簽名調用的項目實踐,感興趣的可以了解一下2023-09-09
通過Java實現(xiàn)在Word中創(chuàng)建可填充表單
這篇文章主要為大家詳細介紹了如何通過Java代碼,以編程方式在Word中創(chuàng)建可填充表單,文中的示例代碼講解詳細,感興趣的小伙伴可以了解一下2023-03-03
Hibernate一對多關聯(lián)雙向關聯(lián)代碼實現(xiàn)分享
Hibernate一對多關聯(lián)雙向關聯(lián)代碼實現(xiàn)分享,大家參考使用吧2013-12-12
java線程并發(fā)blockingqueue類使用示例
BlockingQueue是一種特殊的Queue,若BlockingQueue是空的,從BlockingQueue取東西的操作將會被阻斷進入等待狀態(tài)直到BlocingkQueue進了新貨才會被喚醒,下面是用BlockingQueue來實現(xiàn)Producer和Consumer的例子2014-01-01
Spring rest接口中的LocalDateTime日期類型轉時間戳
這篇文章主要介紹了Spring rest接口中的LocalDateTime日期類型轉時間戳的方法,Java程序中一般將日期類型定義為LocalDateTime,數(shù)據(jù)庫中保存的時間是0時區(qū)的時間2023-03-03
SpringBoot框架中Mybatis-plus的簡單使用操作匯總
這篇文章主要介紹了SpringBoot框架中Mybatis-plus的簡單使用,本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-02-02

