欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Project?Reactor?響應(yīng)式范式編程

 更新時(shí)間:2023年04月03日 11:21:33   作者:一只小小的Bug  
這篇文章主要為大家介紹了Project?Reactor?響應(yīng)式范式編程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

什么是響應(yīng)式編程?

響應(yīng)式編程是一種編程范式,它關(guān)注數(shù)據(jù)的變化和傳播,而不是控制流。響應(yīng)式編程可以提高程序的性能、彈性和可伸縮性,使程序能夠及時(shí)響應(yīng)用戶的需求和環(huán)境的變化。在本文中,我們將介紹Java中的響應(yīng)式編程的基本概念、原理和實(shí)踐。

響應(yīng)式編程的核心思想是將數(shù)據(jù)和行為抽象為流(Stream),流可以表示任何異步的事件或值,比如用戶輸入、網(wǎng)絡(luò)請(qǐng)求、數(shù)據(jù)庫查詢等。流可以被觀察(Observable),也就是說,可以有一個(gè)或多個(gè)觀察者(Observer)訂閱流,并在流發(fā)生變化時(shí)接收通知。流還可以被操作(Operator),也就是說,可以對(duì)流進(jìn)行各種轉(zhuǎn)換、過濾、組合等操作,從而生成新的流。

響應(yīng)式編程的優(yōu)勢(shì):

  • 可以將復(fù)雜的異步邏輯簡(jiǎn)化為聲明式的數(shù)據(jù)流操作,避免了回調(diào)地獄(Callback Hell)、阻塞線程和競(jìng)態(tài)條件等問題。
  • 可以提高程序的性能和資源利用率,通過減少線程的上下文切換和阻塞,以及利用反應(yīng)流的背壓(Backpressure)機(jī)制來控制數(shù)據(jù)流的速度,也就是說,可以讓下游的觀察者控制上游的數(shù)據(jù)源的發(fā)送速率,從而防止數(shù)據(jù)溢出或浪費(fèi)。
  • 可以提高程序的表達(dá)力和靈活性,通過使用函數(shù)式編程的風(fēng)格和操作符來組合和轉(zhuǎn)換數(shù)據(jù)流,以及利用反應(yīng)式編程框架和庫來簡(jiǎn)化異步和事件驅(qū)動(dòng)編程的復(fù)雜度

響應(yīng)式編程的缺點(diǎn):

  • 降低程序的可讀性和維護(hù)性,通過使用嵌套、回調(diào)、訂閱等方式來處理異步事件,以及使用反應(yīng)流的操作符來處理數(shù)據(jù)流,可能導(dǎo)致代碼難以理解和調(diào)試。

Java中的響應(yīng)式編程

Java中有多種框架和庫可以實(shí)現(xiàn)響應(yīng)式編程,比如RxJava、Spring Reactor、Vert.x等。這些框架和庫都遵循了Reactive Streams規(guī)范,這是一套定義了非阻塞背壓的異步流處理標(biāo)準(zhǔn)的接口。

Reactive Streams規(guī)范主要包括四個(gè)接口:

  • Publisher:發(fā)布者,表示一個(gè)數(shù)據(jù)源,可以發(fā)出零個(gè)或多個(gè)數(shù)據(jù),并通知訂閱者完成或出錯(cuò)。
  • Subscriber:訂閱者,表示一個(gè)數(shù)據(jù)消費(fèi)者,可以訂閱一個(gè)發(fā)布者,并在收到數(shù)據(jù)、完成或出錯(cuò)時(shí)做出相應(yīng)的動(dòng)作。
  • Subscription:訂閱,表示發(fā)布者和訂閱者之間的關(guān)系,可以用來請(qǐng)求或取消數(shù)據(jù)。
  • Processor:處理器,表示一個(gè)既是發(fā)布者又是訂閱者的中間組件,可以對(duì)數(shù)據(jù)進(jìn)行處理或轉(zhuǎn)換。

Project Reactor

Project Reactor是一個(gè)完全非阻塞的包含背壓支持的響應(yīng)式編程基石。它是Spring生態(tài)系統(tǒng)中Spring Reactive的基礎(chǔ),被用于如Spring WebFlux, Spring Data和Spring Cloud Gateway等項(xiàng)目中。

基本概念

Project Reactor的核心思想是將數(shù)據(jù)和事件看作是流(stream),流可以被創(chuàng)建,轉(zhuǎn)換,過濾,合并,分組,緩沖,錯(cuò)誤處理等等。流是惰性的,只有當(dāng)有訂閱者(subscriber)訂閱時(shí)才會(huì)開始發(fā)射數(shù)據(jù)或事件。流可以是有限的,也可以是無限的,可以是同步的,也可以是異步的,可以是單線程的,也可以是多線程的。流還可以支持背壓(backpressure),即訂閱者可以控制流的速度,避免被過多的數(shù)據(jù)或事件淹沒。

核心組件

Project Reactor提供了兩個(gè)主要的接口來表示流:

  • Flux: 表示一個(gè)包含0到N個(gè)元素的流
  • Mono: 表示一個(gè)包含0到1個(gè)元素的流。

它們都是Publisher<T>的實(shí)現(xiàn),可以發(fā)出0-N個(gè)元素的異步序列,并根據(jù)訂閱者的需求推送元素。

Flux表示的是包含0到N個(gè)元素的異步序列,可以被onComplete信號(hào)或者onError信號(hào)所終止。

Mono表示的是包含0或1個(gè)元素的異步序列,也可以被onComplete信號(hào)或者onError信號(hào)所終止。

Flux和Mono之間可以進(jìn)行轉(zhuǎn)換。

代碼示例

Mono

// 創(chuàng)建一個(gè)Mono對(duì)象,包含一個(gè)字符串元素
Mono<String> mono = Mono.just("Hello World");

// 訂閱這個(gè)Mono對(duì)象,并打印元素值
mono.subscribe(System.out::println);

使用Mono.just方法創(chuàng)建了一個(gè)包含一個(gè)字符串元素的Mono對(duì)象,然后使用subscribe方法訂閱了這個(gè)對(duì)象,并提供了一個(gè)回調(diào)函數(shù)來打印元素值。當(dāng)Mono對(duì)象發(fā)出元素值時(shí),回調(diào)函數(shù)就會(huì)被調(diào)用。

Mono to Flux

把Mono轉(zhuǎn)換成Flux的一種方法是使用flux()方法,它會(huì)返回一個(gè)包含Mono發(fā)出的元素的Flux,或者如果Mono為空,則返回一個(gè)空的Flux。例如:

// 創(chuàng)建一個(gè)Mono對(duì)象,包含一個(gè)整數(shù)元素
Mono<Integer> mono = Mono.just(1);

// 使用flux()方法把Mono轉(zhuǎn)換成Flux
Flux<Integer> flux = mono.flux();

// 訂閱這個(gè)Flux對(duì)象,并打印元素值
flux.subscribe(System.out::println); // 輸出1

另一種方法是使用concatWith()方法,它會(huì)將Mono與另一個(gè)Publisher連接起來,形成一個(gè)Flux。例如:

// 創(chuàng)建一個(gè)Mono對(duì)象,包含一個(gè)整數(shù)元素
Mono<Integer> mono = Mono.just(1);

// 創(chuàng)建一個(gè)Flux對(duì)象,包含兩個(gè)整數(shù)元素
Flux<Integer> flux = Flux.just(2, 3);

// 使用concatWith()方法把Mono和Flux連接起來
Flux<Integer> result = mono.concatWith(flux);

// 訂閱這個(gè)Flux對(duì)象,并打印元素值
result.subscribe(System.out::println); // 輸出1, 2, 3

Mono常用的操作

// 從一個(gè)固定的值創(chuàng)建一個(gè)Mono
Mono.just("Hello").subscribe(System.out::println); // 輸出Hello

// 從一個(gè)Callable對(duì)象創(chuàng)建一個(gè)Mono
Callable<String> callable = () -> "World";
Mono.fromCallable(callable).subscribe(System.out::println); // 輸出World

// 從一個(gè)Supplier對(duì)象創(chuàng)建一個(gè)Mono
Supplier<String> supplier = () -> "Supplier!";
Mono.fromSupplier(supplier).subscribe(System.out::println); // 輸出Supplier!

// 對(duì)Mono發(fā)出的元素進(jìn)行映射操作
Mono.just("Hello").map(s -> s + " World").subscribe(System.out::println); // 輸出Hello World

// 對(duì)Mono發(fā)出的元素進(jìn)行扁平化操作
Mono.just("Hello")
    .flatMap(s -> Mono.just(s + " World"))
    .subscribe(System.out::println); // 輸出Hello World

// 對(duì)Mono發(fā)出的元素進(jìn)行過濾操作
Mono.just(1).filter(i -> i > 0).subscribe(System.out::println); // 輸出1

// 將多個(gè)Mono合并為一個(gè)Mono
Mono.zip(Mono.just("Hello"), Mono.just("World"))
    .subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2())); // 輸出Hello World

// 將多個(gè)Mono合并為一個(gè)Flux
Mono.just("Hello")
    .mergeWith(Mono.just("World"))
    .subscribe(System.out::println); // 輸出Hello World

// 在這個(gè)Mono完成后,繼續(xù)處理另一個(gè)發(fā)布者
Mono.just("Hello").then(Mono.just("World")).subscribe(System.out::println); // 輸出World

// 在這個(gè)Mono發(fā)出元素時(shí),執(zhí)行一個(gè)副作用操作
Mono.just("Hello")
    .doOnNext(s -> System.out.println("Before: " + s))
    .map(s -> s + " World")
    .doOnNext(s -> System.out.println("After: " + s))
    .subscribe(); 
// 輸出
// Before: Hello 
// After: Hello World

Flux

// 創(chuàng)建一個(gè)Flux對(duì)象,包含三個(gè)整數(shù)元素
Flux<Integer> flux = Flux.just(1, 2, 3);

// 訂閱這個(gè)Flux對(duì)象,并打印元素值
flux.subscribe(System.out::println);

使用Flux.just方法創(chuàng)建了一個(gè)包含三個(gè)整數(shù)元素的Flux對(duì)象,然后使用subscribe方法訂閱了這個(gè)對(duì)象,并提供了一個(gè)回調(diào)函數(shù)來打印元素值。當(dāng)Flux對(duì)象發(fā)出元素值時(shí),回調(diào)函數(shù)就會(huì)被調(diào)用。

Flux to Mono

把Flux轉(zhuǎn)換成Mono的一種方法是使用next()方法,它會(huì)返回Flux發(fā)出的第一個(gè)元素,或者如果Flux為空,則返回一個(gè)空的Mono。

例如:

// 創(chuàng)建一個(gè)Flux對(duì)象,包含三個(gè)整數(shù)元素
Flux<Integer> flux = Flux.just(1, 2, 3);

// 使用next()方法把Flux轉(zhuǎn)換成Mono
Mono<Integer> mono = flux.next();

// 訂閱這個(gè)Mono對(duì)象,并打印元素值
mono.subscribe(System.out::println); // 輸出1

另一種方法是使用collectList()方法,它會(huì)把Flux發(fā)出的所有元素收集到一個(gè)列表中,并返回一個(gè)包含這個(gè)列表的Mono。

例如:

// 創(chuàng)建一個(gè)Flux對(duì)象,包含三個(gè)整數(shù)元素
Flux<Integer> flux = Flux.just(1, 2, 3);

// 使用collectList()方法把Flux轉(zhuǎn)換成Mono
Mono<List<Integer>> mono = flux.collectList();

// 訂閱這個(gè)Mono對(duì)象,并打印元素值
mono.subscribe(System.out::println); // 輸出[1, 2, 3]

Flux常用的操作

// 從多個(gè)固定的值創(chuàng)建一個(gè)Flux
Flux.just("Hello", "World").subscribe(System.out::println); // 輸出Hello World

// 從一個(gè)數(shù)組對(duì)象創(chuàng)建一個(gè)Flux
String[] array = {"Hello", "World"};
Flux.fromArray(array).subscribe(System.out::println); // 輸出Hello World

// 從一個(gè)Iterable對(duì)象創(chuàng)建一個(gè)Flux
List<String> list = Arrays.asList("Hello", "World");
Flux.fromIterable(list).subscribe(System.out::println); // 輸出Hello World

// 從一個(gè)Stream對(duì)象創(chuàng)建一個(gè)Flux
Stream<String> stream = Stream.of("Hello", "World");
Flux.fromStream(stream).subscribe(System.out::println); // 輸出Hello World

// 創(chuàng)建一個(gè)包含指定范圍內(nèi)整數(shù)的Flux
Flux.range(1, 5).subscribe(System.out::println); // 輸出1 2 3 4 5

// 創(chuàng)建一個(gè)按照指定時(shí)間間隔從0整數(shù)遞增的Flux
Duration duration = Duration.ofSeconds(1);
Flux<Long> interval = Flux.interval(duration);
interval.subscribe(System.out::println);
// 使用blockLast阻塞主線程,防止程序立即退出
interval.blockLast();
// 輸出結(jié)果每秒打印一次
// 0
// 1
// 2
// 3
// 4
// ...

// 對(duì)Flux發(fā)出的每個(gè)元素進(jìn)行映射操作
Flux.just("Hello", "World").map(s -> s + "!")
    .subscribe(System.out::println); // 輸出Hello! World!

// 對(duì)Flux發(fā)出的每個(gè)元素進(jìn)行扁平化操作
Flux.just("Hello", "World")
    .flatMap(s -> Flux.just(s + "!"))
    .subscribe(System.out::println); //輸出Hello! World!

// 對(duì)Flux發(fā)出的每個(gè)元素進(jìn)行過濾操作
Flux.range(1, 5).filter(i -> i % 2 == 0).subscribe(System.out::println); 
// 輸出2 4

// 將多個(gè)Flux合并為一個(gè)Flux
Flux.zip(Flux.just("Hello"), Flux.just("World"))
    .subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2())); 
// 輸出Hello World

// 將多個(gè)Flux合并為一個(gè)Flux
Flux.just("Hello").mergeWith(Flux.just("World")).subscribe(System.out::println); 
//  輸出Hello World

// 將多個(gè)Flux合并為一個(gè)Flux
Flux.just("Hello").concatWith(Flux.just("World")).subscribe(System.out::println); 
// 輸出Hello World

// 將所有元素收集到一個(gè)List中
Flux.just("Hello", "World").collectList().subscribe(list -> System.out.println(list)); // 輸出[Hello, World]

Flux的zip、mergeWith、concatWith區(qū)別

zip、mergeWith和concatWith都是用來將多個(gè)Flux合并為一個(gè)Flux的操作,但是它們有一些區(qū)別:

zip會(huì)將多個(gè)Flux的元素按照一對(duì)一的方式進(jìn)行合并,形成一個(gè)包含元組的Flux,每個(gè)元組中包含了每個(gè)源Flux的一個(gè)元素。如果源Flux的元素個(gè)數(shù)不一致,那么zip會(huì)以最短的Flux為基準(zhǔn),多余的元素會(huì)被丟棄。

  Flux<String> flux1 = Flux.just("A", "B", "C");
  Flux<Integer> flux2 = Flux.just(1, 2, 3, 4);
  Flux<Tuple2<String, Integer>> flux3 = Flux.zip(flux1, flux2);
  flux3.subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2())); 
  // 輸出A 1 B 2 C 3
  // 4不會(huì)輸出,因?yàn)樽疃痰腇lux是flux1,長(zhǎng)度是3

mergeWith會(huì)將多個(gè)Flux的元素按照時(shí)間順序進(jìn)行合并,形成一個(gè)包含所有元素的Flux。如果源Flux的元素有重疊,那么mergeWith會(huì)保留所有的元素。

  Duration duration1 = Duration.ofMillis(100);
  Duration duration2 = Duration.ofMillis(200);
  Flux<String> flux1 = Flux.interval(duration1).map(i -> "A" + i);
  Flux<String> flux2 = Flux.interval(duration2).map(i -> "B" + i);
  Flux<String> flux = flux1.mergeWith(flux2);
  flux.subscribe(System.out::println); 
  // 輸出A0 B0 A1 B1 A2 A3 B2 A4 B3 A5 ...

concatWith會(huì)將多個(gè)Flux的元素按照訂閱順序進(jìn)行合并,形成一個(gè)包含所有元素的Flux。如果源Flux的元素有重疊,那么concatWith會(huì)保留所有的元素。concatWith會(huì)等待上一個(gè)源Flux完成后才訂閱下一個(gè)源Flux。

  Duration duration1 = Duration.ofMillis(100);
  Duration duration2 = Duration.ofMillis(200);
  // 每100ms遞增1,打印5次結(jié)束
  Flux<String> flux1 = Flux.interval(duration1).map(i -> "A" + i).take(5);
  Flux<String> flux2 = Flux.interval(duration2).map(i -> "B" + i).take(5);
  Flux<String> flux3 = flux1.concatWith(flux2);
  flux3.subscribe(System.out::println);
  // 避免程序立即退出
  flux3.blockLast();
  // 輸出 
  // A0
  // A1
  // A2
  // A3
  // A4
  // B0
  // B1
  // B2
  // B3
  // B4

可以看到,Project Reactor和Java 8 Stream的用法看起來很像,因?yàn)樗鼈兌继峁┝艘恍┖瘮?shù)式編程的方法,用來對(duì)數(shù)據(jù)流進(jìn)行操作,例如map、filter、reduce等。但是它們的本質(zhì)是不同的,主要有以下幾個(gè)區(qū)別:

  • Project Reactor是基于Reactive Streams規(guī)范的一個(gè)實(shí)現(xiàn),它支持異步、非阻塞、反應(yīng)式的編程模式,而Java 8 Stream是基于集合類的一個(gè)擴(kuò)展,它支持同步、阻塞、命令式的編程模式。
  • Project Reactor是基于Push模式的,它可以讓數(shù)據(jù)源主動(dòng)推送數(shù)據(jù)給訂閱者,并且支持背壓機(jī)制,讓訂閱者可以控制數(shù)據(jù)的流速,而Java 8 Stream是基于Pull模式的,它需要訂閱者主動(dòng)拉取數(shù)據(jù)源的數(shù)據(jù),并且沒有背壓機(jī)制,可能會(huì)導(dǎo)致內(nèi)存溢出或者性能下降。
  • Project Reactor可以處理無限流或者有限流,它可以通過短路操作來終止無限流,而Java 8 Stream只能處理有限流,它不能處理無限流或者異步流。
  • Project Reactor可以在多線程或者單線程環(huán)境下運(yùn)行,它可以通過parallel或者sequential方法來切換并行或者串行模式,而Java 8 Stream只能在單線程環(huán)境下運(yùn)行,它只能通過parallelStream方法來創(chuàng)建并行流。

本文只介紹了Project Reactor的基本概念和用法,包括Flux和Mono的創(chuàng)建、轉(zhuǎn)換、過濾、合并等操作。如果你想深入學(xué)習(xí)Project Reactor,你可以參考官方文檔或者其他相關(guān)的資料,更多關(guān)于Project Reactor 響應(yīng)式編程的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論