一文帶你搞懂Spring響應(yīng)式編程
哈嘍,大家好,我是指北君。
相信響應(yīng)式編程經(jīng)常會(huì)在各種地方被提到。本篇就為大家從函數(shù)式編程一直到Spring WeFlux做一次簡(jiǎn)單的講解,并給出一些示例,希望大家可以更好的理解響應(yīng)式編程,可以在合適的時(shí)機(jī)運(yùn)用到實(shí)際項(xiàng)目中。
1. 前言
了解響應(yīng)式編程,首先我們需要了解函數(shù)式操作和Stream的操作,下面我們簡(jiǎn)單的復(fù)習(xí)一下嘍。
1.1 常用函數(shù)式編程
函數(shù)式接口中
我們先來(lái)回顧一下Java中的函數(shù)式接口。常見的有以下幾種
- Consumer 一個(gè)輸入,無(wú)輸出
- Supplier 無(wú)輸入,有輸出
- Function<T,R> 輸入T,輸出R
- BiFunction<T,U,R> 輸入T,U 輸出R
- Predicate 有輸入,輸出boolean類型
上面的簡(jiǎn)單函數(shù)式接口示例如下:
Consumer consumer = (i)-> System.out.println("this is " + i); consumer.accept("consumer"); Supplier supplier = () -> "this is supplier"; System.out.println(supplier.get()); Function<Integer,Integer> function = (i) -> i*i; System.out.println(function.apply(8)); BiFunction<Integer,Integer,String> biFunction = (i,j)-> i+"*"+j+"="+i*j; System.out.println(biFunction.apply(8,8)); Predicate<Integer> predicate = (i) -> i.intValue()>3; System.out.println(predicate.test(5));
其執(zhí)行結(jié)果如下:
this is consumer
this is supplier
64
8*8=64
true
1.2 Stream操作
對(duì)Stream進(jìn)行操作,主要有幾個(gè)關(guān)鍵點(diǎn):
- 生成流
- 流的中間操作其中中間操作可以有多個(gè),中間操作會(huì)返回一個(gè)新的流(如 map ,filter,sorted等),然后交給下一個(gè)流方法使用。
- 流的終結(jié)操作終結(jié)操作只有一個(gè)。終結(jié)操作執(zhí)行后,流就到了終止?fàn)顟B(tài),無(wú)法被操作 (如forEach,toArray , findFirst 等)。
創(chuàng)建流的示例:
String[] strArray = {"ss","ss","","sdffg"}; Arrays.stream(strArray).forEach(System.out::println); Arrays.asList(strArray).stream().forEach(System.out::println); Stream.of(strArray).forEach(System.out::println); Stream.iterate(1,(i) -> i+1).limit(10).forEach(System.out::println); Stream.generate(() -> new Random().nextInt(10)).limit(10).forEach(System.out::println);
簡(jiǎn)單的流處理示例:
String[] strArray1 = {"ss","ss","","sdffg","bca-de","fff"}; String collect = Stream.of(strArray1) .filter(i -> !i.isEmpty())//過(guò)濾空字符串 .sorted() //排序 .limit(1) //只取第一個(gè)元素 .map(i -> i.replace("-", ""))//替換 "-" .flatMap(i -> Stream.of(i.split("")))//將字符拆成字符數(shù)組 .sorted() //排序 .collect(Collectors.joining());//將字符拼接組合到一起 System.out.println(collect);//最后輸出abcde
2. Java響應(yīng)式編程
響應(yīng)式編程會(huì)用到一個(gè)發(fā)布者和一個(gè)訂閱者,然后通過(guò)訂閱關(guān)系完成數(shù)據(jù)流的傳輸。訂閱關(guān)系中可以處理一些背壓?jiǎn)栴},即調(diào)節(jié)消費(fèi)者與生產(chǎn)者之間的供需平衡,讓整個(gè)程序達(dá)到最大效率。
Java9中java.util.concurrent.Flow接口提供響應(yīng)式流編程類似的功能。
下面我們實(shí)現(xiàn)一個(gè)基于Java 響應(yīng)式編程的示例:
其中有三個(gè)簡(jiǎn)單步驟:
- 建立生產(chǎn)者
- 構(gòu)建消費(fèi)者
- 消費(fèi)者訂閱生產(chǎn)者
- 生產(chǎn)者生產(chǎn)內(nèi)容
SubmissionPublisher publisher = new SubmissionPublisher<>();//建立生產(chǎn)者 Flow.Subscriber subscriber = new Flow.Subscriber() {...};//建立消費(fèi)者 (其中的實(shí)現(xiàn)放到下面) publisher.subscribe(subscriber);//訂閱關(guān)系 for (int i = 0; i < 10; i++) { publisher.submit("test reactive java : " +i); //生產(chǎn)者生產(chǎn)內(nèi)容 }
消費(fèi)者全部代碼如下:
Flow.Subscriber subscriber = new Flow.Subscriber() { Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println("Subscription establish first "); this.subscription = subscription; this.subscription.request(1); } @Override public void onNext(Object item) { subscription.request(10); System.out.println("receive : "+ item); } @Override public void onError(Throwable throwable) { System.out.println(" onError "); } @Override public void onComplete() { System.out.println(" onComplete "); } };
其中onSubscribe方法表示建立訂閱關(guān)系
onNext接受數(shù)據(jù),并請(qǐng)求生產(chǎn)者的數(shù)據(jù)。
onError,onComplete則是error或者完成之后的處理方法。
帶有中間處理器的響應(yīng)式流
Reactive Stream 通常會(huì)基于如下的模型:
下面我們實(shí)現(xiàn)一個(gè)帶有中間處理功能的響應(yīng)式模型:
下面的Processor 既有發(fā)布者,又有訂閱者:
public class ReactiveProcessor extends SubmissionPublisher implements Flow.Subscriber { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println( Thread.currentThread().getName() + " Reactive processor establish connection "); this.subscription = subscription; this.subscription.request(1); } @Override public void onNext(Object item) { System.out.println(Thread.currentThread().getName() + " Reactive processor receive data: "+ item); this.submit(item.toString().toUpperCase()); this.subscription.request(1); } @Override public void onError(Throwable throwable) { System.out.println("Reactive processor error "); throwable.printStackTrace(); this.subscription.cancel(); } @Override public void onComplete() { System.out.println(Thread.currentThread().getName() + " Reactive processor receive data complete "); } }
如上中間處理器訂閱發(fā)布者, 同時(shí)消費(fèi)者再訂閱中間處理器。中間處理器也可以調(diào)節(jié)發(fā)布訂閱的生產(chǎn)消費(fèi)速率。
SubmissionPublisher publisher = new SubmissionPublisher<>(); //創(chuàng)建生產(chǎn)者 ReactiveProcessor reactiveProcessor = new ReactiveProcessor(); // 創(chuàng)建中間處理器 publisher.subscribe(reactiveProcessor); //中間處理器訂閱生產(chǎn)者 Flow.Subscriber subscriber = new Flow.Subscriber() {...}; //創(chuàng)建消費(fèi)者 reactiveProcessor.subscribe(subscriber); //消費(fèi)者訂閱中間處理器 for (int i = 0; i < 10; i++) { publisher.submit("test reactive java : " +i); //生產(chǎn)者生產(chǎn)數(shù)據(jù) }
通過(guò)上述生產(chǎn)者-> 中間處理器->消費(fèi)者, 可以將生產(chǎn)者生產(chǎn)的數(shù)據(jù)全部變成大寫,然后再發(fā)送給最終的消費(fèi)者。
以上式Java中的reactive 編程示例。Java會(huì)不同線程來(lái)分別處理消費(fèi)者與生產(chǎn)者的消息處理
3. Reactor
Reactor中兩個(gè)比較關(guān)鍵的對(duì)象式Flux和Mono, 整個(gè)Spring的響應(yīng)式編程均式基于projectreactor項(xiàng)目。Reactor是響應(yīng)式編程的依賴,主要是基于JVM構(gòu)建非阻塞程序。
根據(jù)Reactor的介紹,此類響應(yīng)式編程的的三方庫(kù)(Reactor)主要是解決一些JVM經(jīng)典異步編程中的一些缺點(diǎn),并且還可以專注于一些新的特性,如下:
- 可組合性與可讀性 (Composability and readability)
- 可以使用豐富的運(yùn)算操作符將數(shù)據(jù)作為流進(jìn)行操作
- 訂閱之前,不會(huì)有任何事
- 背壓特性(Backpressure ),可以理解為消費(fèi)者可以向生產(chǎn)者發(fā)送產(chǎn)出率過(guò)高的信號(hào),從而調(diào)整生產(chǎn)速率。或者消費(fèi)者可以選擇一次性拉去一捆數(shù)據(jù)進(jìn)行消費(fèi)。
- 于并發(fā)無(wú)關(guān)的高度抽象的高級(jí)功能
其中有這么一段解釋,可以形象的說(shuō)明響應(yīng)式編程。
Reactive的程序可以想象成車間的流水線,reactor既是流水線上的傳送帶,又是處理工作站。原料從一個(gè)原始的生產(chǎn)者出發(fā),最終成為產(chǎn)品被推總給消費(fèi)者。
3.1 Flux & Mono
下面我們介紹一下Flux和Mono。
在Reactor中Flux和Mono均是Publisher,即生產(chǎn)者。兩者也有不同。Flux對(duì)象表示0到N個(gè)異步的響應(yīng)序列,而Mono只代表0個(gè)(empty)或者1個(gè)結(jié)果。
Reactor官網(wǎng)上介紹的Flux示意如下:
Mono示意如下:
3.2 Flux Mono創(chuàng)建與使用
我們也可以單獨(dú)引用其依賴。
使用maven依賴
<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> </dependencies>
Mono創(chuàng)建
分別創(chuàng)建空Mono和一個(gè)包含一個(gè)String的Mono,并由消費(fèi)者消費(fèi)打印。
Mono.empty().subscribe(System.out::println); Mono.just("Hello Mono Java North").subscribe(System.out::print);
Flux創(chuàng)建
Flux創(chuàng)建有如下的一些方法,
- just(通過(guò)不定參數(shù)創(chuàng)建)
- range(從某個(gè)整數(shù)開始,往后的整數(shù)數(shù)量)
- fromArray,fromIterable,fromStream,從名稱上就可以看出來(lái),通過(guò)數(shù)組,迭代器,Stream流創(chuàng)建Flux
下面式一些Java代碼示例
Flux.just(1,2,3,4,5).subscribe(System.out::print); Flux.range(1,20).subscribe(System.out::print); Flux.fromArray(new String[]{"a1","a2","a3","a4","a5","a6"}).skip(2).subscribe(System.out::print); Flux.fromIterable(Arrays.asList(1,2,3,4,5,6,7)).subscribe(System.out::println); Flux.fromStream(Stream.of(Arrays.asList(1,2,3,4,5,6,7))).subscribe(System.out::print);
我們?cè)倥e一個(gè)generate的例子
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
如上代碼所示,generate需要一個(gè)Callable參數(shù),而且是supplier (即沒有輸入值,只有一個(gè)輸出)
另一個(gè)參數(shù)是BiFunction (前面我們也介紹過(guò),需要兩個(gè)輸入值,一個(gè)輸出值)。BiFunction中的其中一個(gè)輸入值是SynchronousSink,下面我們給出一個(gè)generate創(chuàng)建Flux的示例。
Flux.generate( () -> 0, //提供一個(gè)初始狀態(tài)值0 (i, sink) -> { sink.next("3*" + i + "=" + 3 * i);//使用初始值去生產(chǎn)一個(gè)3的乘法 if (i > 9) sink.complete();//設(shè)置停止條件 return i + 1;//返回一個(gè)新的狀態(tài)值,以便在下一次的生產(chǎn)中使用,除非響應(yīng)序列終止 }).subscribe(System.out::println);
下面我們?cè)诳匆粋€(gè)Flux嵌套處理示例:
需求:將字符串去空格,并去重,然后排序輸出。
String str = "qa ws ed rf tg yh uj i k ol p za sx dc vf bg hn jm k loi yt "; Flux.fromArray(str.split(" "))//通過(guò)數(shù)組創(chuàng)建Flux .flatMap(i -> Flux.fromArray(i.split(""))) .distinct() // 去重 .sort() //排序 .subscribe(System.out::print); //flatMap與Stream中的flatMap類似,接受Function作為參數(shù),輸入一個(gè)值,輸出一個(gè)值,此處輸出均為Publisher,
以上就是Flux和Mono的一些簡(jiǎn)單介紹,同時(shí)Ractor也支持JDK中的FlowPubliser 和FlowSubscriber與 Reactor中的publisher, subscriber的適配等.
4. WebFlux
SpringBoot 2之后支持的Reactive響應(yīng)式編程。
關(guān)于Reactive技術(shù)棧和經(jīng)典的Servlet技術(shù)棧對(duì)比,Spring官網(wǎng)的這張圖比較清晰。
Spring響應(yīng)式編程主要依賴于Reactor第三方庫(kù),即上面講的Flux和Mono的庫(kù)。
WebFlux主要有以下幾個(gè)要點(diǎn):
- 反應(yīng)式棧web框架
- 完全異步非阻塞
- 運(yùn)行在netty,undertow,Servlet3.1 + 容器
- 核心反應(yīng)式庫(kù) Reactor
- 返回 Flux 或Mono
- 支持注解和函數(shù)編程兩種編程模式
Spring WebFlux示例
下面我們給出幾個(gè)SpringBoot 的響應(yīng)式web示例。
可以去https://start.spring.io/ 新建webflux的項(xiàng)目也可以。
項(xiàng)目中的主要依賴就是spring-boot-starter-webflux
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
基于注解的WebFlux
以下是一個(gè)最簡(jiǎn)單的基于注解的WebFlux
@GetMapping("/hello/mono1") public Mono<String> mono(){ return Mono.just("Hello Mono - Java North"); } @GetMapping("/hello/flux1") public Flux<String> flux(){ return Flux.just("Hello Flux","Hello Java North"); }
基于函數(shù)式編程的WebFlux
創(chuàng)建RouterFunction,將其注入到Spring中即可。
@Bean public RouterFunction<ServerResponse> testRoutes1() { return RouterFunctions.route().GET("/flux/function", new HandlerFunction<ServerResponse>() { @Override public Mono<ServerResponse> handle(ServerRequest request) { return ServerResponse.ok().bodyValue("hello web flux , Hello Java North"); } }).build(); } //上面的方法使用函數(shù)式編程替換之后如下 @Bean public RouterFunction<ServerResponse> testRoutes() { return RouterFunctions.route().GET("/flux/function", request -> ServerResponse.ok() .bodyValue("Hello web flux , Hello Java North")).build(); }
Flux與Mono的響應(yīng)式編程延遲示例
下面我們編寫一段返回Mono的響應(yīng)式Web服務(wù)。
@GetMapping("/hello/mono") public Mono<String> stringMono(){ Mono<String> from = Mono.fromSupplier(() -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { throw new RuntimeException(e); } return "Hello, Spring Reactive date time:"+ LocalDateTime.now(); }); System.out.println( "thread : " + Thread.currentThread().getName()+ " === " + LocalDateTime.now() +" ==========Mono function complete=========="); return from; }
使用postman請(qǐng)求如下, 5秒鐘后返回?cái)?shù)據(jù)。后臺(tái)卻在5秒中之前已經(jīng)處理完整個(gè)方法。
后臺(tái)打印日志:
再看一組Flux
@GetMapping(value = "/hello/flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> flux1(){ Flux<String> stringFlux = Flux.fromStream(IntStream.range(1,6).mapToObj(i ->{ mySleep(1);//表示睡1秒 return "java north flux" + i + "date time: " +LocalDateTime.now(); })); System.out.println("thread : " + Thread.currentThread().getName()+ " === " + LocalDateTime.now() + " ==========Flux function complete========="); return stringFlux; }
此次使用谷歌瀏覽器請(qǐng)求此服務(wù):
可以發(fā)現(xiàn)每隔一秒就會(huì)有一條消息被生產(chǎn)出來(lái)。
后臺(tái)完成時(shí)間同樣是在一開始就完成整個(gè)方法:
通過(guò)上述對(duì)Flux 與 Mono的例子,可以好好體會(huì)一下響應(yīng)式編程。
總結(jié)
本篇回顧了函數(shù)式編程,Stream操作等,然后再舉例講了Java中的Reactive編程示例, 同時(shí)也給處理Reactor三方庫(kù)的Flux于Mono的示例。
最后使用了SpringBoot WebFlux 創(chuàng)建簡(jiǎn)單的響應(yīng)式web服務(wù)。希望能讓大家更好的理解響應(yīng)式編程。
到此這篇關(guān)于一文帶你搞懂Spring響應(yīng)式編程的文章就介紹到這了,更多相關(guān)Spring響應(yīng)式編程內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(50)
下面小編就為大家?guī)?lái)一篇Java基礎(chǔ)的幾道練習(xí)題(分享)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧,希望可以幫到你2021-08-08淺析JavaMail發(fā)送郵件后再通過(guò)JavaMail接收格式問(wèn)題
這篇文章主要介紹了JavaMail發(fā)送郵件后再通過(guò)JavaMail接收格式問(wèn)題 ,本文通過(guò)代碼實(shí)例給大家詳細(xì)解說(shuō),需要的朋友可以參考下2019-06-06springboot?vue測(cè)試平臺(tái)接口定義前后端新增功能實(shí)現(xiàn)
這篇文章主要介紹了springboot?vue測(cè)試平臺(tái)接口定義前后端新增功能實(shí)現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05java動(dòng)態(tài)口令登錄實(shí)現(xiàn)過(guò)程詳解
這篇文章主要介紹了java動(dòng)態(tài)口令登錄實(shí)現(xiàn)過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-07-07Java+opencv3.2.0實(shí)現(xiàn)模板匹配
這篇文章主要為大家詳細(xì)介紹了Java+opencv3.2.0實(shí)現(xiàn)模板匹配的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-02-02