spring?webflux響應式編程使用詳解
一 、響應式編程
響應式編程(reactive programming)是一種基于數(shù)據(jù)流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程范式
(1)數(shù)據(jù)流
將要處理的數(shù)據(jù)抽象成流
通用的流式處理:
生產(chǎn)者產(chǎn)生數(shù)據(jù),對數(shù)據(jù)進行中間處理,消費者拿到數(shù)據(jù)消費。
(2)變化傳遞
● 在命令式編程(我們的日常編程模式)下,式子a=b+c,這就意味著a的值是由b和c計算出來的。如果b或者c后續(xù)有變化,不會影響到a的值。
● 在響應式編程下,式子a:=b+c,這就意味著a的值是由b和c計算出來的。但如果b或者c的值后續(xù)有變化,會影響到a的值。
(3)聲明式
通過API來處理數(shù)據(jù)流中的數(shù)據(jù)。
適用場景:異步非阻塞。響應式系統(tǒng)具有某些特性,使其成為低延遲、高吞吐量工作負載的理想選擇。
比如:
一個日志監(jiān)控系統(tǒng),我們的前端頁面將不再需要通過“命令式”的輪詢的方式不斷向服務器請求數(shù)據(jù)然后進行更新,而是在建立好通道之后,數(shù)據(jù)流從系統(tǒng)源源不斷流向頁面,從而展現(xiàn)實時的指標變化曲線;
一個社交平臺,朋友的動態(tài)、點贊和留言不是手動刷出來的,而是當后臺數(shù)據(jù)變化的時候自動體現(xiàn)到界面上的。
二 、響應式流
響應式流(Reactive Streams)通過定義一組實體,接口和互操作方法,給出了實現(xiàn)異步非阻塞背壓的標準。第三方遵循這個標準來實現(xiàn)具體的解決方案,常見的有Reactor,RxJava,Akka Streams,Ratpack等。
常見的響應流庫簡介
(1)JDK9響應式流:
Publisher:發(fā)布者,數(shù)據(jù)來源,又稱生產(chǎn)者。
Subscriber:訂閱者,數(shù)據(jù)目的地,又稱消費者。
Processor:發(fā)布者和訂閱者之間處理數(shù)據(jù)。
(2)Reactor響應式流庫
發(fā)布者:Mono(返回0或1個元素)、Flux(返回0~n個元素)。
訂閱者:Spring框架來完成。
背壓:
Backpressure
在異步場景中,被觀察者發(fā)送事件的速度遠遠快于觀察者的處理速度情況下,一種告知上游的被觀察者降低發(fā)送速度的策略。
生產(chǎn)者和消費者模式下,如果生產(chǎn)者生產(chǎn)的消費多了,消費者消費不過來,就容易被壓垮了。
借助背壓,消費者可以告訴生產(chǎn)者自己需要多少量的數(shù)據(jù)。
響應式流=異步非阻塞+流量控制。
三、Spring WebFlux
Spring提供了兩條并行的技術棧,一條是基于Servlet API的Spring MVC和Spring Data,另一個條是完整的響應式棧,包括Spring WebFlux和Spring Data’s reactive repositories。

SpringBoot提供了自動配置的Spring Webflux,簡化了響應式web應用的開發(fā)。
1、整合Webflux
pom引入依賴:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
啟動類添加@EnableWebFlux。
兩種方式:
(1)基于注解
@RestController
@RequestMapping("/user")
public class MyController {
@GetMapping("/list")
public Flux<User> getList(){
return Flux.fromIterable(list.stream().filter(k->{
return k.getId()>3;
}).collect(Collectors.toList()));
}
@GetMapping("/{id}")
public Mono<User> getUser(@PathVariable Long userId) {
return this.userRepository.findById(userId);
}
}
(2)函數(shù)式
使用handler代替controller,做流的處理。
引入路由,類似SpringMVC中的@RequestMapping。
@Component
public class UserHandler {
@Resource
UserServiceImpl userService;
public Mono<ServerResponse> getList(ServerRequest serverRequest){
Flux<User> userFlux=userService.getList();
return ServerResponse.ok().body(userFlux, User.class);
}
}
@Configuration(proxyBeanMethods = false)
public class UserRouting {
@Bean
public RouterFunction<ServerResponse> monoRouterFunction(UserHandler userHandler){
return RouterFunctions
.route(GET("/user/list"),userHandler::getList)
.andRoute(GET("/hello").and(accept(TEXT_PLAIN)),userHandler::hello)
.andRoute(GET("/hello/stream").and(accept(APPLICATION_STREAM_JSON)),userHandler::helloStream);
}
}
啟動后,可以看到使用的是內(nèi)置的netty。
訪問http://localhost:20001/user/list 即可。
@GetMapping("list/getStringSleep")
public Mono<String> getStringSleep() {
log.info("invoke start");
Mono<String> s=Mono.fromSupplier(()->createStr());
log.info("invoke end");
return s;
}
private String createStr() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("sleep");
} catch (InterruptedException e) {
}
return "some string";
}
先執(zhí)行invoke start invoke end 再執(zhí)行異步流處理中的sleep日志。
2022-02-18 13:47:26.364 INFO 61992 --- [ctor-http-nio-2] com.example.controller.MyController : invoke start
2022-02-18 13:47:26.366 INFO 61992 --- [ctor-http-nio-2] com.example.controller.MyController : invoke end
2022-02-18 13:47:26.369 DEBUG 61992 --- [ctor-http-nio-2] o.s.w.r.r.m.a.ResponseBodyResultHandler : [fceca854-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] Using 'text/html' given [text/html, application/xhtml+xml, image/avif, image/webp, image/apng, application/xml;q=0.9, application/signed-exchange;v=b3;q=0.9, */*;q=0.8] and supported [text/plain;charset=UTF-8, text/event-stream, text/plain;charset=UTF-8, */*]
2022-02-18 13:47:26.369 DEBUG 61992 --- [ctor-http-nio-2] o.s.w.r.r.m.a.ResponseBodyResultHandler : [fceca854-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] 0..1 [java.lang.String]
sleep
2022-02-18 13:47:31.375 TRACE 61992 --- [ctor-http-nio-2] o.s.core.codec.CharSequenceEncoder : [fceca854-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] Writing "some string"
2022-02-18 13:47:31.376 DEBUG 61992 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations : [fceca854-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] Decreasing pending responses, now 0
2022-02-18 13:47:31.377 DEBUG 61992 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations : [fceca854-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] Last HTTP packet was sent, terminating the channel
2022-02-18 13:47:31.377 TRACE 61992 --- [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter : [fceca854-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] Completed 200 OK, headers={masked}
2022-02-18 13:47:31.377 TRACE 61992 --- [ctor-http-nio-2] o.s.h.s.r.ReactorHttpHandlerAdapter : [fceca854-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] Handling completed
2022-02-18 13:47:31.377 DEBUG 61992 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations : [fceca854-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] Last HTTP response frame
2022-02-18 13:47:31.378 TRACE 61992 --- [ctor-http-nio-2] reactor.netty.channel.ChannelOperations : [fceca854, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] Disposing ChannelOperation from a channel
2、事件推送
Server-Sent Event(SSEs)
@GetMapping(value = "push",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> push() {
log.info("push start");
Flux<String> result= Flux.fromStream(IntStream.range(1,5).mapToObj(i->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
return "push data--" + i;
}));
log.info("push end");
return result;
}
在瀏覽器中執(zhí)行http://localhost:8080/demo/push,在服務器終端可以看到日志,已經(jīng)執(zhí)行完成。但此時頁面上的推送還在進行中。

從瀏覽器的監(jiān)控可以看到,瀏覽器接收的是一個事件流 EventStream。

3、實現(xiàn)背壓
WebFlux中可以用limitRate()設置背壓的限制參數(shù)。
測試代碼:
其中l(wèi)ist為內(nèi)存中構造的數(shù)據(jù)集合,此處忽略。
//不使用背壓
@GetMapping(value = "list")
public Flux<User> getList() {
log.info("invoke start");
Flux<User> user=Flux.fromStream(list.stream());
log.info("invoke end");
return user;
}
//使用背壓
@GetMapping(value = "list/setLimit",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> getListLimit() {
log.info("invoke start");
Flux<User> user=Flux.fromStream(list.stream()).limitRate(2);
log.info("invoke end");
return user;
}
對比使用背壓和不適用背壓:
● 接口的總耗時

● 詳細耗時
使用背壓:

不使用背壓:

瀏覽器接收的數(shù)據(jù)格式略有差異:
不使用背壓:
[{“id”:0,“name”:“user0”},{“id”:1,“name”:“user1”},{“id”:2,“name”:“user2”},{“id”:3,“name”:“user3”},{“id”:4,“name”:“user4”},{“id”:5,“name”:“user5”}]
使用背壓:
data:{“id”:0,“name”:“user0”}
data:{“id”:1,“name”:“user1”}
data:{“id”:2,“name”:“user2”}
data:{“id”:3,“name”:“user3”}
data:{“id”:4,“name”:“user4”}
data:{“id”:5,“name”:“user5”}
四、配置數(shù)據(jù)源(以mysql為例)
我們最常使用的 JDBC 其實是同步的,而我們使用 WebFlux 是為了通過異步的方式來提高服務端的響應效率,WebFlux 雖然實現(xiàn)了異步,但是由于 JDBC 還是同步的,而大部分應用都是離不開數(shù)據(jù)庫的,所以其實效率本質上還是沒有提升。
常見的異步JDBC有:ADAB、R2DBC。
下面以R2DBC為例,介紹webflux配置mysql。
1、CRUD
● pom引入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
</dependency>
● yaml配置數(shù)據(jù)源地址
當然,使用java config的方式配置也可以。
spring:
r2dbc:
url: r2dbcs:mysql://localhost:3306/database?sslMode=DISABLED
username:
password:
注意,這里url中的數(shù)據(jù)庫連接協(xié)議不在是jdbc,而是r2dbcs
● 操作數(shù)據(jù)
這里有兩種方式,都可以實現(xiàn)。
//方法一:通過ReactiveCrudRepository
//類似DAO
public interface UserRepository extends ReactiveCrudRepository<User,Long> {
}
//在Controller中注入,并使用。
@Resource
UserRepository userRepository;
@GetMapping("/list")
public Flux<User> getListFromDB(){
return userRepository.findAll();
}
//方法二:使用 DatabaseClient
@Resource
DatabaseClient databaseClient;
@GetMapping("/{id}")
public Mono<Map<String,Object>> getDetail(@PathVariable("id") Integer id){
return databaseClient.sql("select * from user where id="+id).fetch().one();
}
2、R2DBC
Reactive Relational Database Connectivity,是一個使用響應式驅動集成關系型數(shù)據(jù)庫的孵化器。Spring Data R2DBC運用熟悉的Spring抽象和repository支持R2DBC。
(1)如何體現(xiàn)異步?
測試代碼:
@GetMapping("/{id}")
public Mono<Map<String,Object>> getDetail(@PathVariable("id") Integer id){
log.info("invoke start");
Mono<Map<String,Object>> map= databaseClient.sql("select * from user where id="+id).fetch().one();
log.info("invoke end");
return map;
}
瀏覽器請求 http://localhost:20001/demo/1,從下面的日志可以看出,日志中invoke start、invoke end相繼執(zhí)行,后續(xù)才建立連接,進行數(shù)據(jù)庫查詢操作。
2022-02-11 10:39:36.108 INFO 15483 --- [ctor-http-nio-2] com.example.controller.MyController : invoke start
2022-02-11 10:39:36.108 INFO 15483 --- [ctor-http-nio-2] com.example.controller.MyController : invoke end
2022-02-11 10:39:36.109 DEBUG 15483 --- [ctor-http-nio-2] io.r2dbc.pool.ConnectionPool : Obtaining new connection from the driver
2022-02-11 10:39:36.110 DEBUG 15483 --- [ctor-http-nio-2] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [select * from user where id=1]
2022-02-11 10:39:36.110 TRACE 15483 --- [ctor-http-nio-2] o.s.r2dbc.core.DefaultDatabaseClient : Expanded SQL [select * from user where id=1]
2022-02-11 10:39:36.111 DEBUG 15483 --- [ctor-http-nio-2] dev.miku.r2dbc.mysql.MySqlConnection : Create a simple statement provided by text query
2022-02-11 10:39:36.111 DEBUG 15483 --- [ctor-http-nio-2] d.m.r.mysql.client.ReactorNettyClient : Request: SimpleQueryMessage{sql=REDACTED}
2022-02-11 10:39:36.112 TRACE 15483 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient : [id: 0x42250dba, L:/127.0.0.1:49383 - R:localhost/127.0.0.1:3306] READ: 121B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 00 00 01 02 28 00 00 02 03 64 65 66 06 66 61 |.....(....def.fa|
|00000010| 6d 69 6c 79 04 75 73 65 72 04 75 73 65 72 02 69 |mily.user.user.i|
|00000020| 64 02 69 64 0c 3f 00 0b 00 00 00 03 00 00 00 00 |d.id.?..........|
|00000030| 00 2c 00 00 03 03 64 65 66 06 66 61 6d 69 6c 79 |.,....def.family|
|00000040| 04 75 73 65 72 04 75 73 65 72 04 6e 61 6d 65 04 |.user.user.name.|
|00000050| 6e 61 6d 65 0c 2d 00 90 01 00 00 fd 00 00 00 00 |name.-..........|
|00000060| 00 09 00 00 04 01 31 06 e5 bc a0 e4 b8 89 07 00 |......1.........|
|00000070| 00 05 fe 00 00 22 00 00 00 |....."... |
+--------+-------------------------------------------------+----------------+
2022-02-11 10:39:36.113 DEBUG 15483 --- [actor-tcp-nio-2] d.m.r.mysql.client.MessageDuplexCodec : Decode context change to DecodeContext-Result
2022-02-11 10:39:36.113 DEBUG 15483 --- [actor-tcp-nio-2] d.m.r.m.m.server.MetadataDecodeContext : Respond a metadata bundle by filled-up
2022-02-11 10:39:36.113 DEBUG 15483 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient : Response: SyntheticMetadataMessage{completed=false, messages=[DefinitionMetadataMessage{database='family', table='user' (origin:'user'), column='id' (origin:'id'), collationId=63, size=11, type=3, definitions=0, decimals=0}, DefinitionMetadataMessage{database='family', table='user' (origin:'user'), column='name' (origin:'name'), collationId=45, size=400, type=253, definitions=0, decimals=0}], eof=null}
2022-02-11 10:39:36.113 DEBUG 15483 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient : Response: RowMessage(encoded)
2022-02-11 10:39:36.113 DEBUG 15483 --- [actor-tcp-nio-2] d.m.r.mysql.client.MessageDuplexCodec : Decode context change to DecodeContext-Command
2022-02-11 10:39:36.114 DEBUG 15483 --- [actor-tcp-nio-2] io.r2dbc.pool.PooledConnection : Releasing connection
2022-02-11 10:39:36.114 TRACE 15483 --- [actor-tcp-nio-2] o.s.http.codec.json.Jackson2JsonEncoder : [8cf12dd9-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:49377] Encoding [{id=1, name=張三}]
2022-02-11 10:39:36.115 TRACE 15483 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient : [id: 0x42250dba, L:/127.0.0.1:49383 - R:localhost/127.0.0.1:3306] READ COMPLETE
2022-02-11 10:39:36.115 DEBUG 15483 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations : [8cf12dd9-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:49377] Decreasing pending responses, now 0
2022-02-11 10:39:36.116 TRACE 15483 --- [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter : [8cf12dd9-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:49377] Completed 200 OK, headers={masked}
2022-02-11 10:39:36.116 TRACE 15483 --- [ctor-http-nio-2] o.s.h.s.r.ReactorHttpHandlerAdapter : [8cf12dd9-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:49377] Handling completed
2022-02-11 10:39:36.116 DEBUG 15483 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations : [8cf12dd9-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:49377] Last HTTP response frame
2022-02-11 10:39:36.116 DEBUG 15483 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations : [8cf12dd9-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:49377] Last HTTP packet was sent, terminating the channel
2022-02-11 10:39:36.116 TRACE 15483 --- [ctor-http-nio-2] reactor.netty.channel.ChannelOperations : [8cf12dd9, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:49377] Disposing ChannelOperation from a channel
(2)如何使用事務?
非響應式事務:
代碼在同一線程上執(zhí)行,默認同步的方式實現(xiàn)。采用ThreadLocal的方式,保存連接和會話信息。
響應式事務:
代碼在不同線程上執(zhí)行,使用Context來代替ThreadLocal。
類似一個不可變的Map<Object, Object>,采用CopyOnWrite策略,綁定在每一個訂閱者上。
兩種使用方式:
聲明式事務
Spring 5.2后,已經(jīng)支持響應式事務,可以直接使用@Transactional注解。
@Resource
private DatabaseClient db;
@Override
@Transactional
public Mono<Void> insertRows() {
return db.sql("INSERT INTO user (id,name) VALUES(110, 'aaa')")
.fetch().rowsUpdated()
.then(db.sql("INSERT INTO user (id,name) VALUES(111, 'aaa')")
.then());
}
主要執(zhí)行日志:
2022-02-14 10:59:06.077 DEBUG 22687 --- [ctor-http-nio-2] o.s.r.c.R2dbcTransactionManager : Creating new transaction with name [com.example.service.UserServiceImpl.insertRows]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-02-14 10:59:06.077 DEBUG 22687 --- [ctor-http-nio-2] io.r2dbc.pool.ConnectionPool : Obtaining new connection from the driver
2022-02-14 10:59:06.078 DEBUG 22687 --- [ctor-http-nio-2] o.s.r.c.R2dbcTransactionManager : Acquired Connection [MonoRetry] for R2DBC transaction
2022-02-14 10:59:06.078 DEBUG 22687 --- [ctor-http-nio-2] o.s.r.c.R2dbcTransactionManager : Switching R2DBC Connection [PooledConnection[dev.miku.r2dbc.mysql.MySqlConnection@4f99ad97]] to manual commit
2022-02-14 10:59:06.078 DEBUG 22687 --- [ctor-http-nio-2] d.m.r.mysql.client.ReactorNettyClient : Request: SimpleQueryMessage{sql=REDACTED}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 53 45 54 20 61 75 74 6f 63 6f 6d 6d 69 74 3d |.SET autocommit=|
|00000010| 30 |0 |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 42 45 47 49 4e |.BEGIN |
+--------+-------------------------------------------------+----------------+
2022-02-14 10:59:06.080 TRACE 22687 --- [actor-tcp-nio-2] o.s.t.i.TransactionInterceptor : Getting transaction for [com.example.service.UserServiceImpl.insertRows]
2022-02-14 10:59:06.081 DEBUG 22687 --- [actor-tcp-nio-2] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [INSERT INTO user (id,name) VALUES(110, 'aaa')]
2022-02-14 10:59:06.081 TRACE 22687 --- [actor-tcp-nio-2] o.s.r2dbc.core.DefaultDatabaseClient : Expanded SQL [INSERT INTO user (id,name) VALUES(110, 'aaa')]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 49 4e 53 45 52 54 20 49 4e 54 4f 20 75 73 65 |.INSERT INTO use|
|00000010| 72 20 28 69 64 2c 6e 61 6d 65 29 20 56 41 4c 55 |r (id,name) VALU|
|00000020| 45 53 28 31 31 30 2c 20 27 61 61 61 27 29 |ES(110, 'aaa') |
+--------+-------------------------------------------------+----------------+
2022-02-14 10:59:06.082 DEBUG 22687 --- [actor-tcp-nio-2] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [INSERT INTO user (id,name) VALUES(111, 'aaa')]
2022-02-14 10:59:06.082 TRACE 22687 --- [actor-tcp-nio-2] o.s.r2dbc.core.DefaultDatabaseClient : Expanded SQL [INSERT INTO user (id,name) VALUES(111, 'aaa')]
2022-02-14 10:59:06.082 DEBUG 22687 --- [actor-tcp-nio-2] dev.miku.r2dbc.mysql.MySqlConnection : Create a simple statement provided by text query
2022-02-14 10:59:06.082 DEBUG 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient : Request: SimpleQueryMessage{sql=REDACTED}
2022-02-14 10:59:06.083 TRACE 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient : [id: 0xc62edac7, L:/127.0.0.1:61272 - R:localhost/127.0.0.1:3306] WRITE: 46B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 49 4e 53 45 52 54 20 49 4e 54 4f 20 75 73 65 |.INSERT INTO use|
|00000010| 72 20 28 69 64 2c 6e 61 6d 65 29 20 56 41 4c 55 |r (id,name) VALU|
|00000020| 45 53 28 31 31 31 2c 20 27 61 61 61 27 29 |ES(111, 'aaa') |
+--------+-------------------------------------------------+----------------+
2022-02-14 10:59:06.083 DEBUG 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.MessageDuplexCodec : Decode context change to DecodeContext-Command
2022-02-14 10:59:06.083 TRACE 22687 --- [actor-tcp-nio-2] o.s.t.i.TransactionInterceptor : Completing transaction for [com.example.service.UserServiceImpl.insertRows]
2022-02-14 10:59:06.083 DEBUG 22687 --- [actor-tcp-nio-2] o.s.r.c.R2dbcTransactionManager : Initiating transaction commit
2022-02-14 10:59:06.083 DEBUG 22687 --- [actor-tcp-nio-2] o.s.r.c.R2dbcTransactionManager : Committing R2DBC transaction on Connection [PooledConnection[dev.miku.r2dbc.mysql.MySqlConnection@4f99ad97]]
2022-02-14 10:59:06.083 DEBUG 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient : Request: SimpleQueryMessage{sql=REDACTED}
2022-02-14 10:59:06.084 TRACE 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient : [id: 0xc62edac7, L:/127.0.0.1:61272 - R:localhost/127.0.0.1:3306] WRITE: 4B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 07 00 00 00 |.... |
+--------+-------------------------------------------------+----------------+
2022-02-14 10:59:06.084 TRACE 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient : [id: 0xc62edac7, L:/127.0.0.1:61272 - R:localhost/127.0.0.1:3306] WRITE: 7B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 43 4f 4d 4d 49 54 |.COMMIT |
+--------+-------------------------------------------------+----------------+
2022-02-14 10:59:06.098 TRACE 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient : [id: 0xc62edac7, L:/127.0.0.1:61272 - R:localhost/127.0.0.1:3306] WRITE: 17B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 53 45 54 20 61 75 74 6f 63 6f 6d 6d 69 74 3d |.SET autocommit=|
|00000010| 31 |1 |
2022-02-14 10:59:06.099 DEBUG 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.MessageDuplexCodec : Decode context change to DecodeContext-Command
2022-02-14 10:59:06.099 DEBUG 22687 --- [actor-tcp-nio-2] o.s.r.c.R2dbcTransactionManager : Releasing R2DBC Connection [PooledConnection[dev.miku.r2dbc.mysql.MySqlConnection@4f99ad97]] after transaction
2022-02-14 10:59:06.099 DEBUG 22687 --- [actor-tcp-nio-2] io.r2dbc.pool.PooledConnection : Releasing connection
若執(zhí)行異常,需要回滾時,日志如下:
2022-02-14 11:10:31.410 DEBUG 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient : Response: ErrorMessage{errorCode=1062, sqlState='23000', errorMessage='Duplicate entry '110' for key 'PRIMARY''}
2022-02-14 11:10:31.419 TRACE 22687 --- [actor-tcp-nio-2] o.s.t.i.TransactionInterceptor : Completing transaction for [com.example.service.UserServiceImpl.insertRows] after exception: org.springframework.dao.DataIntegrityViolationException: execute; SQL [INSERT INTO user (id,name) VALUES(110, 'aaa')]; Duplicate entry '110' for key 'PRIMARY'; nested exception is io.r2dbc.spi.R2dbcDataIntegrityViolationException: [1062] [23000] Duplicate entry '110' for key 'PRIMARY'
2022-02-14 11:10:31.421 DEBUG 22687 --- [actor-tcp-nio-2] o.s.r.c.R2dbcTransactionManager : Initiating transaction rollback
2022-02-14 11:10:31.421 DEBUG 22687 --- [actor-tcp-nio-2] o.s.r.c.R2dbcTransactionManager : Rolling back R2DBC transaction on Connection [PooledConnection[dev.miku.r2dbc.mysql.MySqlConnection@47e30501]]
2022-02-14 11:10:31.421 DEBUG 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient : Request: SimpleQueryMessage{sql=REDACTED}
2022-02-14 11:10:31.422 TRACE 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient : [id: 0x55800452, L:/127.0.0.1:61274 - R:localhost/127.0.0.1:3306] WRITE: 4B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 09 00 00 00 |.... |
+--------+-------------------------------------------------+----------------+
2022-02-14 11:10:31.422 TRACE 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient : [id: 0x55800452, L:/127.0.0.1:61274 - R:localhost/127.0.0.1:3306] WRITE: 9B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 52 4f 4c 4c 42 41 43 4b |.ROLLBACK |
+--------+-------------------------------------------------+----------------+
具體源碼查看,在TransactionAspectSupport類中。
其中,根據(jù)ReactiveTransactionManager接口類型,決定走響應式事務或非響應式事務。下面在編程式事務中提到的R2dbcTransactionManager就實現(xiàn)了ReactiveTransactionManager接口。
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final TransactionAspectSupport.InvocationCallback invocation) throws Throwable {
TransactionAttributeSource tas = this.getTransactionAttributeSource();
TransactionAttribute txAttr = tas != null ? tas.getTransactionAttribute(method, targetClass) : null;
TransactionManager tm = this.determineTransactionManager(txAttr);
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method);
boolean hasSuspendingFlowReturnType = isSuspendingFunction && "kotlinx.coroutines.flow.Flow".equals((new MethodParameter(method, -1)).getParameterType().getName());
if (isSuspendingFunction && !(invocation instanceof TransactionAspectSupport.CoroutinesInvocationCallback)) {
throw new IllegalStateException("Coroutines invocation not supported: " + method);
} else {
TransactionAspectSupport.CoroutinesInvocationCallback corInv = isSuspendingFunction ? (TransactionAspectSupport.CoroutinesInvocationCallback)invocation : null;
TransactionAspectSupport.ReactiveTransactionSupport txSupport = (TransactionAspectSupport.ReactiveTransactionSupport)this.transactionSupportCache.computeIfAbsent(method, (key) -> {
Class<?> reactiveType = isSuspendingFunction ? (hasSuspendingFlowReturnType ? Flux.class : Mono.class) : method.getReturnType();
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(reactiveType);
if (adapter == null) {
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " + method.getReturnType());
} else {
return new TransactionAspectSupport.ReactiveTransactionSupport(adapter);
}
});
TransactionAspectSupport.InvocationCallback callback = invocation;
if (corInv != null) {
callback = () -> {
return CoroutinesUtils.invokeSuspendingFunction(method, corInv.getTarget(), corInv.getArguments());
};
}
Object result = txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, (ReactiveTransactionManager)tm);
if (corInv != null) {
Publisher<?> pr = (Publisher)result;
return hasSuspendingFlowReturnType ? TransactionAspectSupport.KotlinDelegate.asFlow(pr) : TransactionAspectSupport.KotlinDelegate.awaitSingleOrNull(pr, corInv.getContinuation());
} else {
return result;
}
}
} else {
//略
}
}
編程式事務
與聲明式事務相比,編程式事務使用的較少,因為配置相對繁瑣。但可以幫助理解執(zhí)行邏輯。
@Resource
ConnectionFactory connectionFactory;
@Override
public Mono<Void> insertRows() {
ReactiveTransactionManager tm=new R2dbcTransactionManager(connectionFactory);
DatabaseClient db=DatabaseClient.create(connectionFactory);
TransactionalOperator transactionalOperator=TransactionalOperator.create(tm);
return db.sql("INSERT INTO user (id,name) VALUES(119, 'bbb')")
.fetch().rowsUpdated()
.then(db.sql("INSERT INTO user (id,name) VALUES(111, 'aaa')")
.then())
.as(transactionalOperator::transactional);
}
R2dbcTransactionManager:響應式事務管理器。
DatabaseClient:客戶端使用R2DBC驅動程序提供對SQL數(shù)據(jù)庫的訪問。
TransactionalOperator:事務運算符將所有上游R2DBC發(fā)布者與事務上下文相關聯(lián)。
(3)線程池配置
為了提升數(shù)據(jù)庫的執(zhí)行效率,減少建立連接的開銷,一般數(shù)據(jù)庫連接都會有連接池的概念,同樣的r2dbc也有一個叫做r2dbc-pool的連接池。
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
<version>${version}</version>
</dependency>
五、遠程調用
Spring webflux提供WebClient進行遠程調用,這里使用webclient做示例,okhttp可以后面作補充。
簡單服務
WebClient webClient= WebClient.builder().build();
@GetMapping("/remote/webClient")
public Flux<User> queryByWebclient(){
log.info("invoke start");
Flux<User> list=webClient.get().uri(url).retrieve().bodyToFlux(User.class);
log.info("invoke end");
return list;
}
主要執(zhí)行流程:
main thread 打印 invoke start invoke end
創(chuàng)建線程池,進行遠程調用,初始化pipeline。
連接調用url,建立channel。
接收響應,釋放channel。
2022-02-15 13:58:32.231 DEBUG 27846 --- [ctor-http-nio-5] s.w.r.r.m.a.RequestMappingHandlerMapping : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Mapped to com.example.controller.MyController#queryByWebclient()
2022-02-15 13:58:32.231 INFO 27846 --- [ctor-http-nio-5] com.example.controller.MyController : invoke start
2022-02-15 13:58:32.232 INFO 27846 --- [ctor-http-nio-5] com.example.controller.MyController : invoke end
2022-02-15 13:58:32.234 DEBUG 27846 --- [ctor-http-nio-5] o.s.w.r.r.m.a.ResponseBodyResultHandler : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Using 'application/json' given [text/html, application/xhtml+xml, image/avif, image/webp, image/apng, application/xml;q=0.9, application/signed-exchange;v=b3;q=0.9, */*;q=0.8] and supported [application/json, application/*+json, application/x-ndjson, text/event-stream]
2022-02-15 13:58:32.234 DEBUG 27846 --- [ctor-http-nio-5] o.s.w.r.r.m.a.ResponseBodyResultHandler : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] 0..N [com.example.model.User]
2022-02-15 13:58:32.236 TRACE 27846 --- [ctor-http-nio-5] o.s.w.r.f.client.ExchangeFunctions : [6c2270b1] HTTP GET http://localhost:8082/web/test?str=webclient, headers={}
2022-02-15 13:58:32.247 DEBUG 27846 --- [ctor-http-nio-5] r.n.resources.PooledConnectionProvider : [ae492444] Created a new pooled channel, now: 0 active connections, 0 inactive connections and 0 pending acquire requests.
2022-02-15 13:58:32.247 DEBUG 27846 --- [ctor-http-nio-5] reactor.netty.transport.TransportConfig : [ae492444] Initialized pipeline DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.left.httpDecompressor = io.netty.handler.codec.http.HttpContentDecompressor), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2022-02-15 13:58:32.252 DEBUG 27846 --- [ctor-http-nio-5] r.netty.transport.TransportConnector : [ae492444] Connecting to [localhost/127.0.0.1:8082].
2022-02-15 13:58:32.252 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider : [ae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] Registering 。 release on close event for channel
2022-02-15 13:58:32.252 DEBUG 27846 --- [ctor-http-nio-5] r.n.resources.PooledConnectionProvider : [ae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] Channel connected, now: 1 active connections, 0 inactive connections and 0 pending acquire requests.
2022-02-15 13:58:32.252 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider : [ae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] onStateChange(PooledConnection{channel=[id: 0xae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082]}, [connected])
2022-02-15 13:58:32.252 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider : [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] onStateChange(GET{uri=/, connection=PooledConnection{channel=[id: 0xae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082]}}, [configured])
2022-02-15 13:58:32.253 DEBUG 27846 --- [ctor-http-nio-5] r.netty.http.client.HttpClientConnect : [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] Handler is being applied: {uri=http://localhost:8082/web/test?str=webclient, method=GET}
2022-02-15 13:58:32.253 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider : [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] onStateChange(GET{uri=/web/test?str=webclient, connection=PooledConnection{channel=[id: 0xae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082]}}, [request_prepared])
2022-02-15 13:58:32.254 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider : [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] onStateChange(GET{uri=/web/test?str=webclient, connection=PooledConnection{channel=[id: 0xae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082]}}, [request_sent])
2022-02-15 13:58:33.384 DEBUG 27846 --- [ctor-http-nio-5] r.n.http.client.HttpClientOperations : [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] Received response (auto-read:false) : [Date=Tue, 15 Feb 2022 05:58:33 GMT, content-length=0]
2022-02-15 13:58:33.384 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider : [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] onStateChange(GET{uri=/web/test?str=webclient, connection=PooledConnection{channel=[id: 0xae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082]}}, [response_received])
2022-02-15 13:58:33.384 TRACE 27846 --- [ctor-http-nio-5] o.s.w.r.f.client.ExchangeFunctions : [6c2270b1] [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] Response 200 OK, headers={masked}
2022-02-15 13:58:33.385 DEBUG 27846 --- [ctor-http-nio-5] reactor.netty.channel.FluxReceive : [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] FluxReceive{pending=0, cancelled=false, inboundDone=false, inboundError=null}: subscribing inbound receiver
2022-02-15 13:58:33.386 DEBUG 27846 --- [ctor-http-nio-5] r.n.http.client.HttpClientOperations : [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] Received last HTTP packet
2022-02-15 13:58:33.387 TRACE 27846 --- [ctor-http-nio-5] reactor.netty.channel.ChannelOperations : [ae492444, L:/127.0.0.1:61048 -
2022-02-15 13:58:33.432 TRACE 27846 --- [ctor-http-nio-5] o.s.http.codec.json.Jackson2JsonEncoder : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Encoding [[]]
2022-02-15 13:58:33.433 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider : [ae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] onStateChange(GET{uri=/web/test?str=webclient, connection=PooledConnection{channel=[id: 0xae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082]}}, [response_completed])
2022-02-15 13:58:33.434 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider : [ae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] onStateChange(GET{uri=/web/test?str=webclient, connection=PooledConnection{channel=[id: 0xae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082]}}, [disconnecting])
2022-02-15 13:58:33.434 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider : [ae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] Releasing channel
2022-02-15 13:58:33.434 DEBUG 27846 --- [ctor-http-nio-5] r.n.resources.PooledConnectionProvider : [ae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] Channel cleaned, now: 0 active connections, 1 inactive connections and 0 pending acquire requests.
2022-02-15 13:58:33.434 TRACE 27846 --- [ctor-http-nio-5] o.s.w.s.adapter.HttpWebHandlerAdapter : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Completed 200 OK, headers={masked}
2022-02-15 13:58:33.434 TRACE 27846 --- [ctor-http-nio-5] o.s.h.s.r.ReactorHttpHandlerAdapter : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Handling completed
2022-02-15 13:58:33.434 DEBUG 27846 --- [ctor-http-nio-5] r.n.http.server.HttpServerOperations : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Last HTTP response frame
2022-02-15 13:58:33.434 DEBUG 27846 --- [ctor-http-nio-5] r.n.http.server.HttpServerOperations : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Decreasing pending responses, now 0
2022-02-15 13:58:33.434 DEBUG 27846 --- [ctor-http-nio-5] r.n.http.server.HttpServerOperations : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Last HTTP packet was sent, terminating the channel
2022-02-15 13:58:33.435 TRACE 27846 --- [ctor-http-nio-5] reactor.netty.channel.ChannelOperations : [7a8c56a7, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Disposing ChannelOperation from a channel
首次進行遠程調用,會創(chuàng)建一個http的客戶端線程池,再次發(fā)起遠程調用時,會直接使用現(xiàn)有線程池。
leasingStrategy:FIFO 先進先出。
evictionInterval:實例銷毀的間隔 0s。
maxConnections:最大連接數(shù),500。
maxIdleTime: 最大空閑時間
maxLifeTime: 最大存活時間
metricsEnabled: 是否啟用指標
pendingAcquireMaxCount:
pendingAcquireTimeout:
Creating a new [http] client pool [PoolFactory{evictionInterval=PT0S, leasingStrategy=fifo, maxConnections=500, maxIdleTime=-1, maxLifeTime=-1, metricsEnabled=false, pendingAcquireMaxCount=1000, pendingAcquireTimeout=45000}] for [localhost:8082]
Created a new pooled channel, now: 0 active connections, 0 inactive connections and 0 pending acquire requests.
依賴服務
如果兩個遠程調用方法之間有彼此依賴關系,應該如何使用?比如rpcB需要使用rpcA返回的結果值做處理。
示例:
@GetMapping("/remote/webClient")
public Flux<Integer> queryByWebclient(){
log.info("invoke start");
long start =System.nanoTime();
Flux<Integer> userFlux = userService.getListDependency(0 ).flatMap(k -> userService.getListDependency(k)
);
userFlux.subscribe(k-> System.out.println(" controller time"+(System.nanoTime()-start)));
log.info("invoke end");
return userFlux;
}
//遠程調用的service
public Flux<Integer> getListDependency(Integer str) {
Flux<Integer> list=webClient.get().uri(url+str).retrieve().bodyToFlux(Integer.class);
list.subscribe(k-> System.out.print("service time"+new Date()));
return list;
}
六、與Spring MVC的比較
示例
SpringMVC方式
@GetMapping("/remote/webClient")
public Flux<Integer> queryByWebclient(){
log.info("invoke start");
long start =System.nanoTime();
Flux<Integer> userFlux = userService.getListDependency(0 ).flatMap(k -> userService.getListDependency(k)
);
userFlux.subscribe(k-> System.out.println(" controller time"+(System.nanoTime()-start)));
log.info("invoke end");
return userFlux;
}
//遠程調用的service
public Flux<Integer> getListDependency(Integer str) {
Flux<Integer> list=webClient.get().uri(url+str).retrieve().bodyToFlux(Integer.class);
list.subscribe(k-> System.out.print("service time"+new Date()));
return list;
}
日志執(zhí)行結果:

WebFlux的響應式處理:
@GetMapping("/list/reactor")
public Flux<User> getListReactor(){
log.info("enter controller");
Flux<User> listFromDB = userService.getListByReactor();
log.info("end controller");
return listFromDB;
}
@Override
public Flux<User> getListByReactor() {
log.info("enter service");
List<User> userList = new ArrayList<>();
Set<Map.Entry<String,User>> entries =userMap.entrySet();
entries.stream().forEach(entry->userList.add(entry.getValue()));
Flux<User> userFlux = Flux.fromStream(() -> createStr(userList));
log.info("end service");
return userFlux;
}
private Stream<User> createStr(List<User> userList) {
try {
System.out.println("start sleep");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
return userList.stream();
}
執(zhí)行結果:

小結:
1、WebFlux是異步處理的,SpringMVC是同步的,但也可以實現(xiàn)異步非阻塞。
2、Servlet3.1已經(jīng)支持異步非阻塞了,但無論是回調或是使用CompletableFuture,代碼實現(xiàn)起來會比較復雜。WebFlux提供的API會更加方便。Spring WebFlux在應對高并發(fā)的請求時,借助于異步IO,能夠以少量而穩(wěn)定的線程處理更高吞吐量的請求。
七、擴展
應用:
Gateway是基于Webflux實現(xiàn)的,它通過擴展HandlerMapping與WebHandler來處理用戶的請求,先通過Predicate定位到Router然后在經(jīng)過FilterChain的過濾處理,最后定位到下層服務。
以上就是spring webflux響應式編程使用詳解的詳細內(nèi)容,更多關于spring webflux使用的資料請關注腳本之家其它相關文章!
相關文章
ByteArrayOutputStream簡介和使用_動力節(jié)點Java學院整理
ByteArrayOutputStream 是字節(jié)數(shù)組輸出流。它繼承于OutputStream。這篇文章主要介紹了ByteArrayOutputStream簡介和使用,需要的朋友可以參考下2017-05-05
Jenkins系統(tǒng)如何進行數(shù)據(jù)備份
隨著我們的長期使用,Jenkins系統(tǒng)中的內(nèi)容會越來越多,特別是一些配置相關的東西,不能有任何丟失。這個時候我們就需要定期備份我們的Jenkins系統(tǒng),避免一些誤操作不小心刪除了某些重要文件,本文就將介紹下Jenkins系統(tǒng)如何進行數(shù)據(jù)備份2021-06-06
java?MongoDB實現(xiàn)列表分頁查詢的示例代碼
本文主要介紹了java?MongoDB實現(xiàn)列表分頁查詢的示例代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-07-07
springboot2.0?@Slf4j?log?彩色日志配置輸出到文件
這篇文章主要介紹了springboot2.0 @Slf4j log日志配置輸出到文件(彩色日志),解決方式是使用了springboot原生自帶的一個log框架,結合實例代碼給大家講解的非常詳細,需要的朋友可以參考下2023-08-08

