SpringBoot之webflux全面解析
webflux介紹
Spring Boot 2.0
spring.io 官網(wǎng)有句醒目的話是:
BUILD ANYTHING WITH SPRING BOOT
Spring Boot (Boot 顧名思義,是引導(dǎo)的意思)框架是用于簡化 Spring 應(yīng)用從搭建到開發(fā)的過程。
應(yīng)用開箱即用,只要通過一個(gè)指令,包括命令行 java -jar 、SpringApplication 應(yīng)用啟動類 、 Spring Boot Maven 插件等,就可以啟動應(yīng)用了。
另外,Spring Boot 強(qiáng)調(diào)只需要很少的配置文件,所以在開發(fā)生產(chǎn)級 Spring 應(yīng)用中,讓開發(fā)變得更加高效和簡易。
目前,Spring Boot 版本是 2.x 版本。Spring Boot 包括 WebFlux。

傳統(tǒng)的以SpringMVC為代表的webmvc技術(shù)使用的是同步阻塞式IO模型

而Spring WebFlux是一個(gè)異步非阻塞式IO模型,可以用少量的容器線程支撐大量的并發(fā)訪問,所以Spring WebFlux可以提升吞吐量和伸縮性,但是接口的響應(yīng)時(shí)間并不會縮短,其處理結(jié)果還是得由worker線程處理完成之后在返回給請求

webflux應(yīng)用場景
適合IO密集型、磁盤IO密集、網(wǎng)絡(luò)IO密集等服務(wù)場景,比如微服務(wù)網(wǎng)關(guān),就可以使用webflux技術(shù)來顯著的提升網(wǎng)關(guān)對下游服務(wù)的吞吐量,spring cloud gateway就使用了webflux這門技術(shù)

Spring Boot 2.0 WebFlux
了解 WebFlux,首先了解下什么是 Reactive Streams。Reactive Streams 是 JVM 中面向流的庫標(biāo)準(zhǔn)和規(guī)范:
- 處理可能無限數(shù)量的元素
- 按順序處理
- 組件之間異步傳遞
- 強(qiáng)制性非阻塞背壓(Backpressure)
Backpressure(背壓)
背壓是一種常用策略,使得發(fā)布者擁有無限制的緩沖區(qū)存儲元素,用于確保發(fā)布者發(fā)布元素太快時(shí),不會去壓制訂閱者。
Reactive Streams(響應(yīng)式流)
一般由以下組成:
一般由以下組成:
publisher:發(fā)布者,發(fā)布元素到訂閱者subscriber:訂閱者,消費(fèi)元素subscription:訂閱,在發(fā)布者中,訂閱被創(chuàng)建時(shí),將與訂閱者共享processor:處理器,發(fā)布者與訂閱者之間處理數(shù)據(jù),包含了發(fā)布者與訂閱者的共同體
publisher接口規(guī)范
public interface Publisher<T> {
void subscribe(Subscriber<? super T> var1);
}subscriber接口規(guī)范
public interface Subscriber<T> {
? ? void onSubscribe(Subscription var1);?
? ? void onNext(T var1);?
? ? void onError(Throwable var1);?
? ? void onComplete();
}subscription接口規(guī)范
public interface Subscription {
? ? void request(long var1);?
? ? void cancel();
}processor接口規(guī)范
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}響應(yīng)式編程
有了 Reactive Streams 這種標(biāo)準(zhǔn)和規(guī)范,利用規(guī)范可以進(jìn)行響應(yīng)式編程。那再了解下什么是 Reactive programming 響應(yīng)式編程。響應(yīng)式編程是基于異步和事件驅(qū)動的非阻塞程序,只是垂直通過在 JVM 內(nèi)啟動少量線程擴(kuò)展,而不是水平通過集群擴(kuò)展。這就是一個(gè)編程范例,具體項(xiàng)目中如何體現(xiàn)呢?
響應(yīng)式項(xiàng)目編程實(shí)戰(zhàn)中,通過基于 Reactive Streams 規(guī)范實(shí)現(xiàn)的框架 Reactor 去實(shí)戰(zhàn)。Reactor 一般提供兩種響應(yīng)式 API :
Mono:實(shí)現(xiàn)發(fā)布者,并返回 0 或 1 個(gè)元素Flux:實(shí)現(xiàn)發(fā)布者,并返回 N 個(gè)元素
Spring Webflux
Spring Boot Webflux 就是基于 Reactor 實(shí)現(xiàn)的。Spring Boot 2.0 包括一個(gè)新的 spring-webflux 模塊。該模塊包含對響應(yīng)式 HTTP 和 WebSocket 客戶端的支持,以及對 REST,HTML 和 WebSocket 交互等程序的支持。一般來說,Spring MVC 用于同步處理,Spring Webflux 用于異步處理。
Spring Boot Webflux 有兩種編程模型實(shí)現(xiàn),一種類似 Spring MVC 注解方式,另一種是使用其功能性端點(diǎn)方式。
Spring Boot 2.0 WebFlux 特性
常用的 Spring Boot 2.0 WebFlux 生產(chǎn)的特性如下:
- 響應(yīng)式 API
- 編程模型
- 適用性
- 內(nèi)嵌容器
- Starter 組件
還有對日志、Web、消息、測試及擴(kuò)展等支持。
響應(yīng)式 API
Reactor 框架是 Spring Boot Webflux 響應(yīng)庫依賴,通過 Reactive Streams 并與其他響應(yīng)庫交互。提供了 兩種響應(yīng)式 API:Mono 和 Flux。一般是將 Publisher 作為輸入,在框架內(nèi)部轉(zhuǎn)換成 Reactor 類型并處理邏輯,然后返回 Flux 或 Mono 作為輸出。
spring webflux和spring mvc的異同點(diǎn)

一圖就很明確了,WebFlux 和 MVC 有交集,方便大家遷移。但是注意:
- MVC 能滿足場景的,就不需要更改為 WebFlux。
- 要注意容器的支持,可以看看下面內(nèi)嵌容器的支持。
- 微服務(wù)體系結(jié)構(gòu),WebFlux 和 MVC 可以混合使用。尤其開發(fā) IO 密集型服務(wù)的時(shí)候,選擇 WebFlux 去實(shí)現(xiàn)。
- spring mvc是一個(gè)命令式的編程方式采用同步阻塞方式,方便開發(fā)人員編寫代碼和調(diào)試;spring webflux調(diào)試會非常不方便
- JDBC連接池和JPA等技術(shù)還是阻塞模型,傳統(tǒng)的關(guān)系型數(shù)據(jù)庫如MySQL也不支持非阻塞的方式獲取數(shù)據(jù),目前只有非關(guān)系型數(shù)據(jù)庫如Redis、Mongodb支持非阻塞方式獲取數(shù)據(jù)
編程模型
Spring 5 web 模塊包含了 Spring WebFlux 的 HTTP 抽象。類似 Servlet API , WebFlux 提供了 WebHandler API 去定義非阻塞 API 抽象接口。可以選擇以下兩種編程模型實(shí)現(xiàn):
- 注解控制層。和 MVC 保持一致,WebFlux 也支持響應(yīng)性 @RequestBody 注解。
- 功能性端點(diǎn)?;?lambda 輕量級編程模型,用來路由和處理請求的小工具。和上面最大的區(qū)別就是,這種模型,全程控制了請求 - 響應(yīng)的生命流程
內(nèi)嵌容器
跟 Spring Boot 大框架一樣啟動應(yīng)用,但 WebFlux 默認(rèn)是通過 Netty 啟動,并且自動設(shè)置了默認(rèn)端口為 8080。另外還提供了對 Jetty、Undertow 等容器的支持。開發(fā)者自行在添加對應(yīng)的容器 Starter 組件依賴,即可配置并使用對應(yīng)內(nèi)嵌容器實(shí)例。
但是要注意,必須是 Servlet 3.1+ 容器,如 Tomcat、Jetty;或者非 Servlet 容器,如 Netty 和 Undertow。
Netty優(yōu)點(diǎn)
- API使用簡單、易上手
- 功能強(qiáng)大、支持多種主流協(xié)議
- 定制能力強(qiáng)、可擴(kuò)展性高
- 性能高、綜合性能最優(yōu)
- 成熟穩(wěn)定、久經(jīng)考驗(yàn)
- 社區(qū)活躍、學(xué)習(xí)資料多
Netty selector模型

Reactor指南
- Reactor 框架是 Pivotal 公司(開發(fā) Spring 等技術(shù)的公司)開發(fā)的
- 實(shí)現(xiàn)了 Reactive Programming 思想,符合Reactive Streams 規(guī)范(Reactive Streams 是由 Netflix、TypeSafe、Pivotal 等公司發(fā)起的)的一項(xiàng)技術(shù)
- 側(cè)重于server端的響應(yīng)式編程框架
- Reactor 框架主要有兩個(gè)主要的模塊:reactor-core 和 reactor-ipc。前者主要負(fù)責(zé) Reactive Programming 相關(guān)的核心 API 的實(shí)現(xiàn),后者負(fù)責(zé)高性能網(wǎng)絡(luò)通信的實(shí)現(xiàn),目前是基于 Netty 實(shí)現(xiàn)的。
Java原有的異步編程方式
Callback:異步方法采用一個(gè)callback作為參數(shù),當(dāng)結(jié)果出來后回調(diào)這個(gè)callback,例如swings的EventListenerFuture:異步方法返回一個(gè)Future<T>,此時(shí)結(jié)果并不是立刻可以拿到,需要處理結(jié)束之后才可以使用
Future局限
- 多個(gè)Future組合不易
- 調(diào)用Future#get時(shí)仍然會阻塞
- 缺乏對多個(gè)值以及進(jìn)一步的出錯(cuò)處理
Reactor的Publisher
Mono實(shí)現(xiàn)了 org.reactivestreams.Publisher 接口,代表0到1個(gè)元素的響應(yīng)式序列。Flux同樣實(shí)現(xiàn)了 org.reactivestreams.Publisher 接口,代表0到N個(gè)元素的結(jié)果。
Flux介紹

- Flux<T>是一個(gè)標(biāo)準(zhǔn)Publisher<T>,表示0到N個(gè)發(fā)射項(xiàng)的異步序列,可選地以完成信號或錯(cuò)誤終止。與Reactive Streams規(guī)范中一樣,這三種類型的信號轉(zhuǎn)換為對下游訂閱者的onNext、onComplete或onError方法的調(diào)用。
- 在這種大范圍的可能信號中,F(xiàn)lux是通用的reactive 類型。注意,所有事件,甚至終止事件,都是可選的:沒有onNext事件,但是onComplete事件表示一個(gè)空的有限序列,但是移除onComplete并且您有一個(gè)無限的空序列(除了關(guān)于取消的測試之外,沒有特別有用)。同樣,無限序列不一定是空的。例如,F(xiàn)lux.interval(Duration) 產(chǎn)生一個(gè)Flux<Long>,它是無限的,從時(shí)鐘發(fā)出規(guī)則的數(shù)據(jù)。
Mono介紹

- Mono<T>是一個(gè)專門的Publisher<T>,它最多發(fā)出一個(gè)項(xiàng),然后可選地以onComplete信號或onError信號結(jié)束。
- 它只提供了可用于Flux的操作符的子集,并且一些操作符(特別是那些將Mono與另一個(gè)發(fā)布者組合的操作符)切換到Flux。
- 例如,Mono#concatWith(Publisher)返回一個(gè)Flux ,而Mono#then(Mono)則返回另一個(gè)Mono。
- 注意,Mono可以用于表示只有完成概念(類似于Runnable)的無值異步進(jìn)程。若要創(chuàng)建一個(gè),請使用Mono<Void>。
publisher訂閱

reactor實(shí)踐
首先maven工廠引入pom
<dependency> ? ? <groupId>org.springframework.boot</groupId> ? ? <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {
@Test
public void testReactor(){
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6);
Mono<Integer> mono = Mono.just(1);
Integer[] arr = {1,2,3,4,5,6};
Flux<Integer> flux1 = Flux.fromArray(arr);
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);
Flux<Integer> flux2 = Flux.fromIterable(list);
Flux<Integer> flux3 = Flux.from(flux);
Flux<Integer> flux4 = Flux.fromStream(Stream.of(1, 2, 3, 4, 5, 6));
flux.subscribe();
flux1.subscribe(System.out::println);
flux2.subscribe(System.out::println,System.err::println);
flux3.subscribe(System.out::println,System.err::println,() -> System.out.println("complete"));
flux4.subscribe(System.out::println,System.err::println,
() -> System.out.println("complete"),
subscription -> subscription.request(3));
flux4.subscribe(new DemoSubscriber());
}
class DemoSubscriber extends BaseSubscriber<Integer>{
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribe");
subscription.request(1);
}
@Override
protected void hookOnNext(Integer value) {
if(value == 4){
//背壓,通知數(shù)據(jù)源,不要發(fā)送數(shù)據(jù)了
cancel();
}
System.out.println(value);
request(1);
}
}
}Reactor操作符
map - 元素映射為新元素
- map操作可以將數(shù)據(jù)元素進(jìn)行轉(zhuǎn)換/映射,得到一個(gè)新元素。

flatMap - 元素映射為流
- flatMap操作可以將每個(gè)數(shù)據(jù)元素轉(zhuǎn)換/映射為一個(gè)流,然后將這些流合并為一個(gè)大的數(shù)據(jù)流。

filter - 過濾
- filter操作可以對數(shù)據(jù)元素進(jìn)行篩選。

zip - 一對一合并
看到zip這個(gè)詞可能會聯(lián)想到拉鏈,它能夠?qū)⒍鄠€(gè)流一對一的合并起來。zip有多個(gè)方法變體,我們介紹一個(gè)最常見的二合一的。

更多
Reactor中提供了非常豐富的操作符,除了以上幾個(gè)常見的,還有:
- 用于編程方式自定義生成數(shù)據(jù)流的create和generate等及其變體方法;
- 用于“無副作用的peek”場景的doOnNext、doOnError、doOncomplete、doOnSubscribe、doOnCancel等及其變體方法;
- 用于數(shù)據(jù)流轉(zhuǎn)換的when、and/or、merge、concat、collect、count、repeat等及其變體方法;
- 用于過濾/揀選的take、first、last、sample、skip、limitRequest等及其變體方法;
- 用于錯(cuò)誤處理的timeout、onErrorReturn、onErrorResume、doFinally、retryWhen等及其變體方法;
- 用于分批的window、buffer、group等及其變體方法;
- 用于線程調(diào)度的publishOn和subscribeOn方法。
使用這些操作符,你幾乎可以搭建出能夠進(jìn)行任何業(yè)務(wù)需求的數(shù)據(jù)處理管道/流水線。
抱歉以上這些暫時(shí)不能一一介紹,更多詳情請參考JavaDoc
reactor和java8 stream區(qū)別
形似而神不似
reactor:push模式,服務(wù)端推送數(shù)據(jù)給客戶端java8 stream:pull模式,客戶端主動向服務(wù)端請求數(shù)據(jù)
Reactor線程模型
Reactor創(chuàng)建線程的方式
Schedulers.immediate():當(dāng)前線程Schedulers.single():可重用的單線程,注意,這個(gè)方法對所有調(diào)用者都提供同一個(gè)線程來使用, 直到該調(diào)度器被廢棄。如果你想使用獨(dú)占的線程,請使用Schedulers.newSingle();Schedulers.elastic():彈性線程池,它根據(jù)需要創(chuàng)建一個(gè)線程池,重用空閑線程。線程池如果空閑時(shí)間過長 (默認(rèn)為 60s)就會被廢棄。對于 I/O 阻塞的場景比較適用。Schedulers.elastic()能夠方便地給一個(gè)阻塞 的任務(wù)分配它自己的線程,從而不會妨礙其他任務(wù)和資源;Schedulers.parallel():固定大小線程池,所創(chuàng)建線程池的大小與CPU個(gè)數(shù)等同Schedulers.fromExecutorService(ExecutorService):自定義線程池,基于自定義的ExecutorService創(chuàng)建 Scheduler(雖然不太建議,不過你也可以使用Executor來創(chuàng)建)
線程模型

線程切換實(shí)踐
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {
@Test
public void testReactor() throws InterruptedException {
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6);
flux.map(i -> {
System.out.println(Thread.currentThread().getName()+"-map1");
return i * 3;
}).publishOn(Schedulers.elastic()).map(
i -> {
System.out.println(Thread.currentThread().getName()+"-map2");
return i / 3;
}
).subscribeOn(Schedulers.parallel())
.subscribe(i -> System.out.println(Thread.currentThread().getName()+"-" + i));
Thread.sleep(10000);
}
}線程切換總結(jié)
publishOn:它將上游信號傳給下游,同時(shí)改變后續(xù)的操作符的執(zhí)行所在線程,直到下一個(gè)publishOn出現(xiàn)在這個(gè)鏈上subscribeOn:作用于向上的訂閱鏈,無論處于操作鏈的什么位置,它都會影響到源頭的線程執(zhí)行環(huán)境,但不會影響到后續(xù)的publishOn
webflux實(shí)踐
兼容spring mvc的寫法
@RestController
public class DemoController {
@GetMapping("/demo")
public Mono<String> demo(){
return Mono.just("demo");
}
}spring webflux函數(shù)式寫法
@Component
public class DemoHandler {
public Mono<ServerResponse> hello(ServerRequest request){
return ok().contentType(MediaType.TEXT_PLAIN)
.body(Mono.just("hello"),String.class);
}
public Mono<ServerResponse> world(ServerRequest request){
return ok().contentType(MediaType.TEXT_PLAIN)
.body(Mono.just("world"),String.class);
}
public Mono<ServerResponse> times(ServerRequest request){
//每隔一秒發(fā)送當(dāng)前的時(shí)間
return ok().contentType(MediaType.TEXT_EVENT_STREAM)
.body(Flux.interval(Duration.ofSeconds(1))
.map(it -> new SimpleDateFormat("HH:mm:ss").format(new Date())),String.class);
}
}配置路由
@Configuration
public class RouterConfig {
@Autowired
private DemoHandler demoHandler;
@Bean
public RouterFunction<ServerResponse> demoRouter(){
//路由函數(shù)的編寫
return route(GET("/hello"),demoHandler::hello)
.andRoute(GET("/world"),demoHandler::world)
.andRoute(GET("/times"),demoHandler::times);
}
}連接關(guān)系型數(shù)據(jù)庫案例
@Component
public class DemoHandler {
@Autowired
private PersonService personService;
public Mono<ServerResponse> queryPerson(ServerRequest request){
Integer id = Integer.valueOf(request.pathVariable("id"));
return ok().contentType(MediaType.APPLICATION_JSON_UTF8)
.body(Mono.just(personService.getPersonById(id)), Person.class);
}
}配置路由
@Configuration
public class RouterConfig {
@Autowired
private DemoHandler demoHandler;
@Bean
public RouterFunction<ServerResponse> demoRouter(){
//路由函數(shù)的編寫
return route(GET("/hello"),demoHandler::hello)
.andRoute(GET("/world"),demoHandler::world)
.andRoute(GET("/times"),demoHandler::times)
.andRoute(GET("/queryPerson/{id}"),demoHandler::queryPerson);
}
}連接非關(guān)系型數(shù)據(jù)庫案例
引入mongodb的maven
<dependency> ? ? <groupId>org.springframework.boot</groupId> ? ? <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId> </dependency>
在application.properties中配置mongodb屬性
#mongodb spring.data.mongodb.uri=mongodb://root:yibo@localhost:27017 spring.data.mongodb.database=webflux
編寫代碼
@Document(collection = "user")
@Data
public class User {
@Id
private String id;
private String name;
private int age;
}
@Repository
public interface UserRepository extends ReactiveMongoRepository<User,String> {
}
@Component
public class DemoHandler {
@Autowired
private UserRepository userRepository;
public Mono<ServerResponse> listUser(ServerRequest request){
return ok().contentType(MediaType.APPLICATION_JSON_UTF8)
.body(userRepository.findAll(), User.class);
}
public Mono<ServerResponse> saveUser(ServerRequest request){
String name = request.pathVariable("name");
Integer age = Integer.valueOf(request.pathVariable("age"));
User user = new User();
user.setName(name);
user.setAge(age);
Mono<User> mono = Mono.just(user);
return ok().build(userRepository.insert(mono).then());
}
}編寫路由
@Configuration
public class RouterConfig {?
? ? @Autowired
? ? private DemoHandler demoHandler;
?
? ? @Bean
? ? public RouterFunction<ServerResponse> demoRouter(){
? ? ? ? //路由函數(shù)的編寫
? ? ? ? return route(GET("/hello"),demoHandler::hello)
? ? ? ? ? ? ? ? .andRoute(GET("/world"),demoHandler::world)
? ? ? ? ? ? ? ? .andRoute(GET("/times"),demoHandler::times)
? ? ? ? ? ? ? ? .andRoute(GET("/queryPerson/{id}"),demoHandler::queryPerson)
? ? ? ? ? ? ? ? .andRoute(GET("/listUser"),demoHandler::listUser)
? ? ? ? ? ? ? ? .andRoute(GET("/saveUser/{name}/{age}"),demoHandler::saveUser);
? ? }
}webflux解析
spring mvc處理流程


具體步驟:
- 第一步:發(fā)起請求到前端控制器(DispatcherServlet)
- 第二步:前端控制器請求HandlerMapping查找 Handler (可以根據(jù)xml配置、注解進(jìn)行查找)
- 匹配條件包括:請求路徑、請求方法、header信息等
- 第三步:處理器映射器HandlerMapping向前端控制器返回Handler,HandlerMapping會把請求映射為HandlerExecutionChain對象(包含一個(gè)Handler處理器(頁面控制器)對象,多個(gè)HandlerInterceptor攔截器對象),通過這種策略模式,很容易添加新的映射策略
- HandlerInterceptor是請求路徑上的攔截器,需要自己實(shí)現(xiàn)這個(gè)接口以攔截請求,做一些對handler的前置和后置處理工作。
- 第四步:前端控制器調(diào)用處理器適配器去執(zhí)行Handler
- 第五步:處理器適配器HandlerAdapter將會根據(jù)適配的結(jié)果去執(zhí)行Handler
- 第六步:Handler執(zhí)行完成給適配器返回ModelAndView
- 第七步:處理器適配器向前端控制器返回ModelAndView (ModelAndView是springmvc框架的一個(gè)底層對象,包括 Model和view)
- 第八步:前端控制器請求視圖解析器去進(jìn)行視圖解析 (根據(jù)邏輯視圖名解析成真正的視圖(jsp)),通過這種策略很容易更換其他視圖技術(shù),只需要更改視圖解析器即可
- 第九步:視圖解析器向前端控制器返回View
- 第十步:前端控制器進(jìn)行視圖渲染 (視圖渲染將模型數(shù)據(jù)(在ModelAndView對象中)填充到request域)
- 第十一步:前端控制器向用戶響應(yīng)結(jié)果
spring webflux處理請求流程

核心控制器DispatcherHandler,等同于阻塞方式的DispatcherServlet
DispatcherHandler實(shí)現(xiàn)ApplicationContextAware,那么必然會調(diào)用setApplicationContext方法
public class DispatcherHandler implements WebHandler, ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
initStrategies(applicationContext);
}
}initStrategies初始化
獲取HandlerMapping,HandlerAdapter,HandlerResultHandler的所有實(shí)例
protected void initStrategies(ApplicationContext context) {
//獲取HandlerMapping及其子類型的bean
//HandlerMapping根據(jù)請求request獲取handler執(zhí)行鏈
Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerMapping.class, true, false);
ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
//排序
AnnotationAwareOrderComparator.sort(mappings);
this.handlerMappings = Collections.unmodifiableList(mappings);
//獲取HandlerAdapter及其子類型的bean
Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerAdapter.class, true, false);
this.handlerAdapters = new ArrayList<>(adapterBeans.values());
//排序
AnnotationAwareOrderComparator.sort(this.handlerAdapters);
//獲取HandlerResultHandler及其子類型的bean
Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerResultHandler.class, true, false);
this.resultHandlers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(this.resultHandlers);
}webflux中引入了一個(gè)新的HandlerMapping,即RouterFunctionMapping
RouterFunctionMapping實(shí)現(xiàn)了InitializingBean,因此在其實(shí)例化的時(shí)候,會調(diào)用afterPropertiesSet方法
public class RouterFunctionMapping extends AbstractHandlerMapping implements InitializingBean {
@Nullable
private RouterFunction<?> routerFunction;
//讀取http傳輸數(shù)據(jù),并解碼成一個(gè)對象
private List<HttpMessageReader<?>> messageReaders = Collections.emptyList();
public RouterFunctionMapping(RouterFunction<?> routerFunction) {
this.routerFunction = routerFunction;
}
@Nullable
public RouterFunction<?> getRouterFunction() {
return this.routerFunction;
}
public void setMessageReaders(List<HttpMessageReader<?>> messageReaders) {
this.messageReaders = messageReaders;
}
@Override
public void afterPropertiesSet() throws Exception {
if (CollectionUtils.isEmpty(this.messageReaders)) {
ServerCodecConfigurer codecConfigurer = ServerCodecConfigurer.create();
this.messageReaders = codecConfigurer.getReaders();
}
if (this.routerFunction == null) {
//afterPropertiesSet方法調(diào)用的時(shí)候,routerFunction為null
initRouterFunctions();
}
}
protected void initRouterFunctions() {
//獲取routerFunctions集合
List<RouterFunction<?>> routerFunctions = routerFunctions();
//將一個(gè)請求中含有多個(gè)路由請求RouterFunction合并成一個(gè)RouterFunction
this.routerFunction = routerFunctions.stream().reduce(RouterFunction::andOther).orElse(null);
logRouterFunctions(routerFunctions);
}
private List<RouterFunction<?>> routerFunctions() {
//obtainApplicationContext()獲取ApplicationContext對象
List<RouterFunction<?>> functions = obtainApplicationContext()
//獲取指定bean的提供者,即上文配置的路由類
.getBeanProvider(RouterFunction.class)
//排序
.orderedStream()
//將流里面的都強(qiáng)轉(zhuǎn)成RouterFunction對象
.map(router -> (RouterFunction<?>)router)
.collect(Collectors.toList());
return (!CollectionUtils.isEmpty(functions) ? functions : Collections.emptyList());
}
private void logRouterFunctions(List<RouterFunction<?>> routerFunctions) {
//判斷當(dāng)前的日志級別是否是Debug
if (logger.isDebugEnabled()) {
int total = routerFunctions.size();
String message = total + " RouterFunction(s) in " + formatMappingName();
if (logger.isTraceEnabled()) {
if (total > 0) {
routerFunctions.forEach(routerFunction -> logger.trace("Mapped " + routerFunction));
}
else {
logger.trace(message);
}
}
else if (total > 0) {
logger.debug(message);
}
}
}
......
}- webflux中引入了一個(gè)新的HandlerAdapter,即HandlerFunctionAdapter
- webflux中引入了一個(gè)新的HandlerResultHandler,即ServerResponseResultHandler
ServerResponseResultHandler實(shí)現(xiàn)了InitializingBean,因此在其實(shí)例化的時(shí)候,會調(diào)用afterPropertiesSet方法
流式處理請求handler()
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
//handlerMappings在initStrategies()方法中已經(jīng)構(gòu)造好了
if (this.handlerMappings == null) {
return createNotFoundError();
}
//構(gòu)造Flux,數(shù)據(jù)源為handlerMappings集合
return Flux.fromIterable(this.handlerMappings)
//獲取Mono<Handler>對象,通過concatMap保證順序和handlerMappings順序一致
//嚴(yán)格保證順序是因?yàn)樵谝粋€(gè)系統(tǒng)中可能存在一個(gè)Url有多個(gè)能夠處理的HandlerMapping的情況
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
//如果next()娶不到值則拋出錯(cuò)誤
.switchIfEmpty(createNotFoundError())
//觸發(fā)HandlerApter的handle方法
.flatMap(handler -> invokeHandler(exchange, handler))
//觸發(fā)HandlerResultHandler 的handleResult方法
.flatMap(result -> handleResult(exchange, result));
}觸發(fā)HandlerApter的handle方法
private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
return getResultHandler(result).handleResult(exchange, result)
.onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult ->
getResultHandler(exceptionResult).handleResult(exchange, exceptionResult)));
}
private HandlerResultHandler getResultHandler(HandlerResult handlerResult) {
if (this.resultHandlers != null) {
for (HandlerResultHandler resultHandler : this.resultHandlers) {
if (resultHandler.supports(handlerResult)) {
return resultHandler;
}
}
}
throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue());
}總結(jié)
DispatcherHandler的流程是
1、通過 HandlerMapping(和DispathcherServlet中的HandlerMapping不同)獲取到HandlerAdapter放到ServerWebExchange的屬性中
2、獲取到HandlerAdapter后觸發(fā)handle方法,得到HandlerResult3、通過HandlerResult,觸發(fā)handleResult,針對不同的返回類找到不同的HandlerResultHandler如視圖渲染ViewResolutionResultHandler、ServerResponseResultHandler、ResponseBodyResultHandler、ResponseEntityResultHandler不同容器有不同的實(shí)現(xiàn),如Reactor,Jetty,Tomcat等。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- SpringBoot3中Spring?WebFlux?SSE服務(wù)器發(fā)送事件的實(shí)現(xiàn)步驟
- SpringBoot深入分析webmvc和webflux的區(qū)別
- springboot webflux 過濾器(使用RouterFunction實(shí)現(xiàn))
- Springboot WebFlux集成Spring Security實(shí)現(xiàn)JWT認(rèn)證的示例
- 詳解Spring Boot2 Webflux的全局異常處理
- 解決spring-boot2.0.6中webflux無法獲得請求IP的問題
- Spring?Boot?3.4.3?基于?Spring?WebFlux?實(shí)現(xiàn)?SSE?功能(代碼示例)
相關(guān)文章
Java接口統(tǒng)一樣式返回模板的實(shí)現(xiàn)
這篇文章主要介紹了Java接口統(tǒng)一樣式返回模板的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12
淺析Spring容器原始Bean是如何創(chuàng)建的
這篇文章主要是想和小伙伴們一起聊聊?Spring?容器創(chuàng)建?Bean?最最核心的?createBeanInstance?方法,文中的示例代碼講解詳細(xì),需要的可以參考一下2023-08-08
Spring Boot RestTemplate提交表單數(shù)據(jù)的三種方法
本篇文章主要介紹了Spring Boot RestTemplate提交表單數(shù)據(jù)的三種方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-03-03
mybatis報(bào)Query?was?Empty異常的問題
這篇文章主要介紹了mybatis報(bào)Query?was?Empty異常的問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03
Spring Cloud Zuul路由網(wǎng)關(guān)服務(wù)過濾實(shí)現(xiàn)代碼
這篇文章主要介紹了Spring Cloud Zuul路由網(wǎng)關(guān)服務(wù)過濾實(shí)現(xiàn)代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04
詳解json string轉(zhuǎn)換為java bean及實(shí)例代碼
這篇文章主要介紹了詳解json string轉(zhuǎn)換為java bean及實(shí)例代碼的相關(guān)資料,這里提供實(shí)例代碼幫助大家理解,需要的朋友可以參考下2017-07-07
Java使用Hutool+自定義注解實(shí)現(xiàn)數(shù)據(jù)脫敏
我們在使用手機(jī)銀行的時(shí)候經(jīng)常能看到APP上會將銀行卡的卡號中間部分給隱藏掉使用 ***** 來代替,在某些網(wǎng)站上查看一些業(yè)務(wù)密碼時(shí)(例如簽到密碼等)也會使用 ***** 來隱藏掉真正的密碼,那么這種方式是如何實(shí)現(xiàn)的呢,本文將給大家介紹使用Hutool+自定義注解實(shí)現(xiàn)數(shù)據(jù)脫敏2023-09-09
Socket與ServerSocket類構(gòu)造方法與API
今天小編為大家整理了Socket與ServerSocket類構(gòu)造方法與API,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值。需要的朋友可以收藏下,方便下次瀏覽觀看2021-12-12

