Spring?WebFlux?核心作用


Spring WebFlux 是 Spring Framework 5 引入的響應(yīng)式 Web 框架,基于 Project Reactor 實現(xiàn),支持非阻塞、函數(shù)式編程模型。本節(jié)將深入探討 WebFlux 的核心功能,包括 REST API 構(gòu)建、響應(yīng)式數(shù)據(jù)庫訪問和實時通信。
4.3.1 構(gòu)建 Reactive REST API

基礎(chǔ)項目搭建
首先創(chuàng)建 Spring Boot WebFlux 項目(基于 Spring Initializr):
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>響應(yīng)式控制器
注解式控制器(與傳統(tǒng)Spring MVC類似但支持響應(yīng)式類型):
@RestController
@RequestMapping("/products")
public class ProductController {
private final ProductService productService;
// 構(gòu)造函數(shù)注入
public ProductController(ProductService productService) {
this.productService = productService;
}
@GetMapping
public Flux<Product> getAllProducts() {
return productService.findAll();
}
@GetMapping("/{id}")
public Mono<Product> getProductById(@PathVariable String id) {
return productService.findById(id);
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<Product> createProduct(@RequestBody Mono<Product> productMono) {
return productService.save(productMono);
}
@PutMapping("/{id}")
public Mono<Product> updateProduct(
@PathVariable String id,
@RequestBody Mono<Product> productMono) {
return productService.update(id, productMono);
}
@DeleteMapping("/{id}")
@ResponseStatus(HttpStatus.NO_CONTENT)
public Mono<Void> deleteProduct(@PathVariable String id) {
return productService.delete(id);
}
}函數(shù)式端點(diǎn)(RouterFunction方式):
@Configuration
public class ProductRouter {
@Bean
public RouterFunction<ServerResponse> route(ProductHandler handler) {
return RouterFunctions.route()
.GET("/fn/products", handler::getAll)
.GET("/fn/products/{id}", handler::getById)
.POST("/fn/products", handler::create)
.PUT("/fn/products/{id}", handler::update)
.DELETE("/fn/products/{id}", handler::delete)
.build();
}
}
@Component
public class ProductHandler {
private final ProductService productService;
public ProductHandler(ProductService productService) {
this.productService = productService;
}
public Mono<ServerResponse> getAll(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_NDJSON)
.body(productService.findAll(), Product.class);
}
public Mono<ServerResponse> getById(ServerRequest request) {
String id = request.pathVariable("id");
return productService.findById(id)
.flatMap(product -> ServerResponse.ok().bodyValue(product))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> create(ServerRequest request) {
return request.bodyToMono(Product.class)
.flatMap(productService::save)
.flatMap(product -> ServerResponse
.created(URI.create("/fn/products/" + product.getId()))
.bodyValue(product));
}
public Mono<ServerResponse> update(ServerRequest request) {
String id = request.pathVariable("id");
return request.bodyToMono(Product.class)
.flatMap(product -> productService.update(id, Mono.just(product)))
.flatMap(product -> ServerResponse.ok().bodyValue(product))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> delete(ServerRequest request) {
String id = request.pathVariable("id");
return productService.delete(id)
.then(ServerResponse.noContent().build());
}
}高級特性

流式響應(yīng)(Server-Sent Events):
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ProductEvent> streamProducts() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> new ProductEvent(
"product-" + sequence,
"Event at " + Instant.now()
));
}請求驗證與異常處理:
@ControllerAdvice
public class GlobalErrorHandler extends AbstractErrorWebExceptionHandler {
public GlobalErrorHandler(ErrorAttributes errorAttributes,
WebProperties.Resources resources,
ApplicationContext applicationContext,
ServerCodecConfigurer serverCodecConfigurer) {
super(errorAttributes, resources, applicationContext);
this.setMessageWriters(serverCodecConfigurer.getWriters());
}
@Override
protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
return RouterFunctions.route(
RequestPredicates.all(),
request -> {
Map<String, Object> errorProperties = getErrorAttributes(request, ErrorAttributeOptions.defaults());
HttpStatus status = getHttpStatus(errorProperties);
return ServerResponse.status(status)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(errorProperties);
}
);
}
private HttpStatus getHttpStatus(Map<String, Object> errorProperties) {
return HttpStatus.valueOf((Integer)errorProperties.get("status"));
}
}
// 自定義驗證
public class ProductValidator {
public static Mono<Product> validate(Product product) {
return Mono.just(product)
.flatMap(p -> {
List<String> errors = new ArrayList<>();
if (p.getName() == null || p.getName().isEmpty()) {
errors.add("Product name is required");
}
if (p.getPrice() <= 0) {
errors.add("Price must be positive");
}
if (!errors.isEmpty()) {
return Mono.error(new ValidationException(errors));
}
return Mono.just(p);
});
}
}4.3.2 響應(yīng)式數(shù)據(jù)庫訪問(R2DBC)

R2DBC 配置
添加依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<scope>runtime</scope>
</dependency>配置 application.yml:
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/mydb
username: user
password: pass
pool:
enabled: true
max-size: 20響應(yīng)式Repository
定義實體:
@Data
@Table("products")
public class Product {
@Id
private Long id;
private String name;
private String description;
private BigDecimal price;
private Instant createdAt;
}創(chuàng)建Repository接口:
public interface ProductRepository extends ReactiveCrudRepository<Product, Long> {
Flux<Product> findByNameContaining(String name);
@Query("SELECT * FROM products WHERE price > :minPrice")
Flux<Product> findByPriceGreaterThan(BigDecimal minPrice);
@Modifying
@Query("UPDATE products SET price = price * :factor")
Mono<Integer> updateAllPrices(BigDecimal factor);
}復(fù)雜查詢與事務(wù)

自定義查詢實現(xiàn):
public class ProductRepositoryImpl implements CustomProductRepository {
private final DatabaseClient databaseClient;
public ProductRepositoryImpl(DatabaseClient databaseClient) {
this.databaseClient = databaseClient;
}
@Override
public Flux<Product> complexSearch(ProductCriteria criteria) {
return databaseClient.sql("""
SELECT * FROM products
WHERE name LIKE :name
AND price BETWEEN :minPrice AND :maxPrice
ORDER BY :sortField :sortDirection
LIMIT :limit OFFSET :offset
""")
.bind("name", "%" + criteria.getName() + "%")
.bind("minPrice", criteria.getMinPrice())
.bind("maxPrice", criteria.getMaxPrice())
.bind("sortField", criteria.getSortField())
.bind("sortDirection", criteria.getSortDirection())
.bind("limit", criteria.getPageSize())
.bind("offset", (criteria.getPageNumber() - 1) * criteria.getPageSize())
.map((row, metadata) -> toProduct(row))
.all();
}
private Product toProduct(Row row) {
// 行到對象的轉(zhuǎn)換邏輯
}
}事務(wù)管理:
@Service
@RequiredArgsConstructor
public class ProductService {
private final ProductRepository productRepository;
private final TransactionalOperator transactionalOperator;
public Mono<Void> transferStock(String fromId, String toId, int quantity) {
return transactionalOperator.execute(status ->
productRepository.findById(fromId)
.flatMap(fromProduct -> {
if (fromProduct.getStock() < quantity) {
return Mono.error(new InsufficientStockException());
}
fromProduct.setStock(fromProduct.getStock() - quantity);
return productRepository.save(fromProduct)
.then(productRepository.findById(toId))
.flatMap(toProduct -> {
toProduct.setStock(toProduct.getStock() + quantity);
return productRepository.save(toProduct);
});
})
);
}
}性能優(yōu)化
連接池配置:
spring:
r2dbc:
pool:
max-size: 20
initial-size: 5
max-idle-time: 30m批處理操作:
public Mono<Integer> batchInsert(List<Product> products) {
return databaseClient.inConnectionMany(connection -> {
Batch batch = connection.createBatch();
products.forEach(product ->
batch.add("INSERT INTO products(name, price) VALUES($1, $2)")
.bind(0, product.getName())
.bind(1, product.getPrice())
);
return Flux.from(batch.execute())
.reduce(0, (count, result) -> count + result.getRowsUpdated());
});
}4.3.3 WebSocket 實時通信

基礎(chǔ)WebSocket配置
配置類:
@Configuration
@EnableWebFlux
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setHandshakeHandler(new DefaultHandshakeHandler())
.setAllowedOrigins("*");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic");
registry.setApplicationDestinationPrefixes("/app");
}
}響應(yīng)式WebSocket處理
股票行情推送示例:
@Controller
public class StockTickerController {
private final Flux<StockQuote> stockQuoteFlux;
public StockTickerController(StockQuoteGenerator quoteGenerator) {
this.stockQuoteFlux = Flux.interval(Duration.ofMillis(500))
.map(sequence -> quoteGenerator.generate())
.share(); // 熱發(fā)布,多個訂閱者共享數(shù)據(jù)
}
@MessageMapping("stocks.subscribe")
@SendTo("/topic/stocks")
public Flux<StockQuote> subscribe() {
return stockQuoteFlux;
}
@MessageMapping("stocks.filter")
public Flux<StockQuote> filter(@Payload String symbol) {
return stockQuoteFlux.filter(quote ->
quote.getSymbol().equals(symbol));
}
}客戶端連接示例:
const socket = new SockJS('/ws');
const stompClient = Stomp.over(socket);
stompClient.connect({}, () => {
stompClient.subscribe('/topic/stocks', (message) => {
const quote = JSON.parse(message.body);
updateStockTable(quote);
});
stompClient.send("/app/stocks.filter", {}, "AAPL");
});高級特性

RSocket集成(更強(qiáng)大的響應(yīng)式協(xié)議):
@Controller
@MessageMapping("stock.service")
public class RSocketStockController {
@MessageMapping("current")
public Mono<StockQuote> current(String symbol) {
return stockService.getCurrent(symbol);
}
@MessageMapping("stream")
public Flux<StockQuote> stream(String symbol) {
return stockService.getStream(symbol);
}
@MessageMapping("channel")
public Flux<StockQuote> channel(Flux<String> symbols) {
return symbols.flatMap(stockService::getStream);
}
}背壓控制:
@MessageMapping("large.data.stream")
public Flux<DataChunk> largeDataStream() {
return dataService.streamLargeData()
.onBackpressureBuffer(50, // 緩沖區(qū)大小
chunk -> log.warn("Dropping chunk due to backpressure"));
}
安全配置
@Configuration
@EnableWebFluxSecurity
public class SecurityConfig {
@Bean
public SecurityWebFilterChain securityFilterChain(ServerHttpSecurity http) {
return http
.authorizeExchange()
.pathMatchers("/ws/**").authenticated()
.anyExchange().permitAll()
.and()
.httpBasic()
.and()
.csrf().disable()
.build();
}
@Bean
public MapReactiveUserDetailsService userDetailsService() {
UserDetails user = User.withUsername("user")
.password("{noop}password")
.roles("USER")
.build();
return new MapReactiveUserDetailsService(user);
}
}性能監(jiān)控與最佳實踐

監(jiān)控端點(diǎn)配置
management:
endpoints:
web:
exposure:
include: health, metrics, prometheus
metrics:
tags:
application: ${spring.application.name}響應(yīng)式應(yīng)用監(jiān)控
@Bean
public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {
return registry -> registry.config()
.commonTags("application", "reactive-demo");
}
// 自定義指標(biāo)
@Bean
public WebFilter metricsWebFilter(MeterRegistry registry) {
return (exchange, chain) -> {
String path = exchange.getRequest().getPath().toString();
Timer.Sample sample = Timer.start(registry);
return chain.filter(exchange)
.doOnSuccessOrError((done, ex) -> {
sample.stop(registry.timer("http.requests",
"uri", path,
"status", exchange.getResponse().getStatusCode().toString(),
"method", exchange.getRequest().getMethodValue()));
});
};
}最佳實踐總結(jié)
線程模型理解:
- WebFlux 默認(rèn)使用 Netty 事件循環(huán)線程
- 阻塞操作必須使用
publishOn切換到彈性線程池
背壓策略選擇:
- UI 客戶端:使用
onBackpressureDrop或onBackpressureLatest - 服務(wù)間通信:使用
onBackpressureBuffer配合合理緩沖區(qū)大小
- UI 客戶端:使用
錯誤處理原則:
- 盡早處理錯誤
- 為每個 Flux/Mono 鏈添加錯誤處理
- 區(qū)分業(yè)務(wù)異常和系統(tǒng)異常
測試策略:
- 使用
StepVerifier測試響應(yīng)式流 - 使用
WebTestClient測試控制器 - 虛擬時間測試長時間操作
- 使用
性能調(diào)優(yōu):
- 合理配置連接池
- 監(jiān)控關(guān)鍵指標(biāo)(延遲、吞吐量、資源使用率)
- 使用響應(yīng)式日志框架(如 Logback 異步Appender)
通過以上全面實踐,您將能夠構(gòu)建高性能、可擴(kuò)展的響應(yīng)式 Web 應(yīng)用,充分利用 WebFlux 的非阻塞特性,處理高并發(fā)場景下的各種挑戰(zhàn)。
到此這篇關(guān)于Spring WebFlux 深度實踐指南的文章就介紹到這了,更多相關(guān)Spring WebFlux內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MyBatis Plus實現(xiàn)一對多的查詢場景的三種方法
MyBatis Plus提供了多種簡便的方式來進(jìn)行一對多子查詢,本文主要介紹了MyBatis Plus實現(xiàn)一對多的查詢場景的三種方法,具有一定的參考價值,感興趣的可以了解一下2024-07-07
解決mybatis使用char類型字段查詢oracle數(shù)據(jù)庫時結(jié)果返回null問題
這篇文章主要介紹了mybatis使用char類型字段查詢oracle數(shù)據(jù)庫時結(jié)果返回null問題的解決方法,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價值,需要的朋友可以參考下2018-06-06
SpringBoot3.x打包Docker容器的實現(xiàn)
這篇文章主要介紹了SpringBoot3.x打包Docker容器的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-11-11
Java利用POI實現(xiàn)導(dǎo)入導(dǎo)出Excel表格示例代碼
最近工作中遇到一個需求,是需要導(dǎo)出數(shù)據(jù)到Excel表格里,所以寫個Demo測試一下,還是比較簡單的,現(xiàn)在分享給大家,有需要的朋友們可以參考借鑒,下面來一起看看吧。2016-10-10

