欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring?WebFlux?核心作用

 更新時間:2025年08月11日 09:28:35   作者:百錦再@新空間  
Spring?WebFlux是Spring?Framework?5的響應(yīng)式框架,結(jié)合性能優(yōu)化與最佳實踐,構(gòu)建高效、可擴(kuò)展的Web應(yīng)用,本節(jié)將深入探討?WebFlux?的核心功能,包括?REST?API?構(gòu)建、響應(yīng)式數(shù)據(jù)庫訪問和實時通信,感興趣的朋友一起看看吧

Spring WebFlux 是 Spring Framework 5 引入的響應(yīng)式 Web 框架,基于 Project Reactor 實現(xiàn),支持非阻塞、函數(shù)式編程模型。本節(jié)將深入探討 WebFlux 的核心功能,包括 REST API 構(gòu)建、響應(yīng)式數(shù)據(jù)庫訪問和實時通信。

4.3.1 構(gòu)建 Reactive REST API

基礎(chǔ)項目搭建

首先創(chuàng)建 Spring Boot WebFlux 項目(基于 Spring Initializr):

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

響應(yīng)式控制器

注解式控制器(與傳統(tǒng)Spring MVC類似但支持響應(yīng)式類型):

@RestController
@RequestMapping("/products")
public class ProductController {
    private final ProductService productService;
    // 構(gòu)造函數(shù)注入
    public ProductController(ProductService productService) {
        this.productService = productService;
    }
    @GetMapping
    public Flux<Product> getAllProducts() {
        return productService.findAll();
    }
    @GetMapping("/{id}")
    public Mono<Product> getProductById(@PathVariable String id) {
        return productService.findById(id);
    }
    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<Product> createProduct(@RequestBody Mono<Product> productMono) {
        return productService.save(productMono);
    }
    @PutMapping("/{id}")
    public Mono<Product> updateProduct(
            @PathVariable String id, 
            @RequestBody Mono<Product> productMono) {
        return productService.update(id, productMono);
    }
    @DeleteMapping("/{id}")
    @ResponseStatus(HttpStatus.NO_CONTENT)
    public Mono<Void> deleteProduct(@PathVariable String id) {
        return productService.delete(id);
    }
}

函數(shù)式端點(diǎn)(RouterFunction方式):

@Configuration
public class ProductRouter {
    @Bean
    public RouterFunction<ServerResponse> route(ProductHandler handler) {
        return RouterFunctions.route()
            .GET("/fn/products", handler::getAll)
            .GET("/fn/products/{id}", handler::getById)
            .POST("/fn/products", handler::create)
            .PUT("/fn/products/{id}", handler::update)
            .DELETE("/fn/products/{id}", handler::delete)
            .build();
    }
}
@Component
public class ProductHandler {
    private final ProductService productService;
    public ProductHandler(ProductService productService) {
        this.productService = productService;
    }
    public Mono<ServerResponse> getAll(ServerRequest request) {
        return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_NDJSON)
            .body(productService.findAll(), Product.class);
    }
    public Mono<ServerResponse> getById(ServerRequest request) {
        String id = request.pathVariable("id");
        return productService.findById(id)
            .flatMap(product -> ServerResponse.ok().bodyValue(product))
            .switchIfEmpty(ServerResponse.notFound().build());
    }
    public Mono<ServerResponse> create(ServerRequest request) {
        return request.bodyToMono(Product.class)
            .flatMap(productService::save)
            .flatMap(product -> ServerResponse
                .created(URI.create("/fn/products/" + product.getId()))
                .bodyValue(product));
    }
    public Mono<ServerResponse> update(ServerRequest request) {
        String id = request.pathVariable("id");
        return request.bodyToMono(Product.class)
            .flatMap(product -> productService.update(id, Mono.just(product)))
            .flatMap(product -> ServerResponse.ok().bodyValue(product))
            .switchIfEmpty(ServerResponse.notFound().build());
    }
    public Mono<ServerResponse> delete(ServerRequest request) {
        String id = request.pathVariable("id");
        return productService.delete(id)
            .then(ServerResponse.noContent().build());
    }
}

高級特性

流式響應(yīng)(Server-Sent Events):

@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ProductEvent> streamProducts() {
    return Flux.interval(Duration.ofSeconds(1))
        .map(sequence -> new ProductEvent(
            "product-" + sequence, 
            "Event at " + Instant.now()
        ));
}

請求驗證與異常處理

@ControllerAdvice
public class GlobalErrorHandler extends AbstractErrorWebExceptionHandler {
    public GlobalErrorHandler(ErrorAttributes errorAttributes, 
                            WebProperties.Resources resources,
                            ApplicationContext applicationContext,
                            ServerCodecConfigurer serverCodecConfigurer) {
        super(errorAttributes, resources, applicationContext);
        this.setMessageWriters(serverCodecConfigurer.getWriters());
    }
    @Override
    protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
        return RouterFunctions.route(
            RequestPredicates.all(), 
            request -> {
                Map<String, Object> errorProperties = getErrorAttributes(request, ErrorAttributeOptions.defaults());
                HttpStatus status = getHttpStatus(errorProperties);
                return ServerResponse.status(status)
                    .contentType(MediaType.APPLICATION_JSON)
                    .bodyValue(errorProperties);
            }
        );
    }
    private HttpStatus getHttpStatus(Map<String, Object> errorProperties) {
        return HttpStatus.valueOf((Integer)errorProperties.get("status"));
    }
}
// 自定義驗證
public class ProductValidator {
    public static Mono<Product> validate(Product product) {
        return Mono.just(product)
            .flatMap(p -> {
                List<String> errors = new ArrayList<>();
                if (p.getName() == null || p.getName().isEmpty()) {
                    errors.add("Product name is required");
                }
                if (p.getPrice() <= 0) {
                    errors.add("Price must be positive");
                }
                if (!errors.isEmpty()) {
                    return Mono.error(new ValidationException(errors));
                }
                return Mono.just(p);
            });
    }
}

4.3.2 響應(yīng)式數(shù)據(jù)庫訪問(R2DBC)

R2DBC 配置

添加依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-postgresql</artifactId>
    <scope>runtime</scope>
</dependency>

配置 application.yml:

spring:
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/mydb
    username: user
    password: pass
    pool:
      enabled: true
      max-size: 20

響應(yīng)式Repository

定義實體:

@Data
@Table("products")
public class Product {
    @Id
    private Long id;
    private String name;
    private String description;
    private BigDecimal price;
    private Instant createdAt;
}

創(chuàng)建Repository接口:

public interface ProductRepository extends ReactiveCrudRepository<Product, Long> {
    Flux<Product> findByNameContaining(String name);
    @Query("SELECT * FROM products WHERE price > :minPrice")
    Flux<Product> findByPriceGreaterThan(BigDecimal minPrice);
    @Modifying
    @Query("UPDATE products SET price = price * :factor")
    Mono<Integer> updateAllPrices(BigDecimal factor);
}

復(fù)雜查詢與事務(wù)

自定義查詢實現(xiàn)

public class ProductRepositoryImpl implements CustomProductRepository {
    private final DatabaseClient databaseClient;
    public ProductRepositoryImpl(DatabaseClient databaseClient) {
        this.databaseClient = databaseClient;
    }
    @Override
    public Flux<Product> complexSearch(ProductCriteria criteria) {
        return databaseClient.sql("""
                SELECT * FROM products 
                WHERE name LIKE :name 
                AND price BETWEEN :minPrice AND :maxPrice
                ORDER BY :sortField :sortDirection
                LIMIT :limit OFFSET :offset
                """)
            .bind("name", "%" + criteria.getName() + "%")
            .bind("minPrice", criteria.getMinPrice())
            .bind("maxPrice", criteria.getMaxPrice())
            .bind("sortField", criteria.getSortField())
            .bind("sortDirection", criteria.getSortDirection())
            .bind("limit", criteria.getPageSize())
            .bind("offset", (criteria.getPageNumber() - 1) * criteria.getPageSize())
            .map((row, metadata) -> toProduct(row))
            .all();
    }
    private Product toProduct(Row row) {
        // 行到對象的轉(zhuǎn)換邏輯
    }
}

事務(wù)管理

@Service
@RequiredArgsConstructor
public class ProductService {
    private final ProductRepository productRepository;
    private final TransactionalOperator transactionalOperator;
    public Mono<Void> transferStock(String fromId, String toId, int quantity) {
        return transactionalOperator.execute(status -> 
            productRepository.findById(fromId)
                .flatMap(fromProduct -> {
                    if (fromProduct.getStock() < quantity) {
                        return Mono.error(new InsufficientStockException());
                    }
                    fromProduct.setStock(fromProduct.getStock() - quantity);
                    return productRepository.save(fromProduct)
                        .then(productRepository.findById(toId))
                        .flatMap(toProduct -> {
                            toProduct.setStock(toProduct.getStock() + quantity);
                            return productRepository.save(toProduct);
                        });
                })
        );
    }
}

性能優(yōu)化

連接池配置

spring:
  r2dbc:
    pool:
      max-size: 20
      initial-size: 5
      max-idle-time: 30m

批處理操作

public Mono<Integer> batchInsert(List<Product> products) {
    return databaseClient.inConnectionMany(connection -> {
        Batch batch = connection.createBatch();
        products.forEach(product -> 
            batch.add("INSERT INTO products(name, price) VALUES($1, $2)")
                .bind(0, product.getName())
                .bind(1, product.getPrice())
        );
        return Flux.from(batch.execute())
            .reduce(0, (count, result) -> count + result.getRowsUpdated());
    });
}

4.3.3 WebSocket 實時通信

基礎(chǔ)WebSocket配置

配置類:

@Configuration
@EnableWebFlux
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
            .setHandshakeHandler(new DefaultHandshakeHandler())
            .setAllowedOrigins("*");
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic");
        registry.setApplicationDestinationPrefixes("/app");
    }
}

響應(yīng)式WebSocket處理

股票行情推送示例

@Controller
public class StockTickerController {
    private final Flux<StockQuote> stockQuoteFlux;
    public StockTickerController(StockQuoteGenerator quoteGenerator) {
        this.stockQuoteFlux = Flux.interval(Duration.ofMillis(500))
            .map(sequence -> quoteGenerator.generate())
            .share(); // 熱發(fā)布,多個訂閱者共享數(shù)據(jù)
    }
    @MessageMapping("stocks.subscribe")
    @SendTo("/topic/stocks")
    public Flux<StockQuote> subscribe() {
        return stockQuoteFlux;
    }
    @MessageMapping("stocks.filter")
    public Flux<StockQuote> filter(@Payload String symbol) {
        return stockQuoteFlux.filter(quote -> 
            quote.getSymbol().equals(symbol));
    }
}

客戶端連接示例

const socket = new SockJS('/ws');
const stompClient = Stomp.over(socket);
stompClient.connect({}, () => {
    stompClient.subscribe('/topic/stocks', (message) => {
        const quote = JSON.parse(message.body);
        updateStockTable(quote);
    });
    stompClient.send("/app/stocks.filter", {}, "AAPL");
});

高級特性

RSocket集成(更強(qiáng)大的響應(yīng)式協(xié)議):

@Controller
@MessageMapping("stock.service")
public class RSocketStockController {
    @MessageMapping("current")
    public Mono<StockQuote> current(String symbol) {
        return stockService.getCurrent(symbol);
    }
    @MessageMapping("stream")
    public Flux<StockQuote> stream(String symbol) {
        return stockService.getStream(symbol);
    }
    @MessageMapping("channel")
    public Flux<StockQuote> channel(Flux<String> symbols) {
        return symbols.flatMap(stockService::getStream);
    }
}

背壓控制

@MessageMapping("large.data.stream")
public Flux<DataChunk> largeDataStream() {
    return dataService.streamLargeData()
        .onBackpressureBuffer(50, // 緩沖區(qū)大小
            chunk -> log.warn("Dropping chunk due to backpressure"));
}

安全配置

@Configuration
@EnableWebFluxSecurity
public class SecurityConfig {
    @Bean
    public SecurityWebFilterChain securityFilterChain(ServerHttpSecurity http) {
        return http
            .authorizeExchange()
                .pathMatchers("/ws/**").authenticated()
                .anyExchange().permitAll()
            .and()
            .httpBasic()
            .and()
            .csrf().disable()
            .build();
    }
    @Bean
    public MapReactiveUserDetailsService userDetailsService() {
        UserDetails user = User.withUsername("user")
            .password("{noop}password")
            .roles("USER")
            .build();
        return new MapReactiveUserDetailsService(user);
    }
}

性能監(jiān)控與最佳實踐

監(jiān)控端點(diǎn)配置

management:
  endpoints:
    web:
      exposure:
        include: health, metrics, prometheus
  metrics:
    tags:
      application: ${spring.application.name}

響應(yīng)式應(yīng)用監(jiān)控

@Bean
public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {
    return registry -> registry.config()
        .commonTags("application", "reactive-demo");
}
// 自定義指標(biāo)
@Bean
public WebFilter metricsWebFilter(MeterRegistry registry) {
    return (exchange, chain) -> {
        String path = exchange.getRequest().getPath().toString();
        Timer.Sample sample = Timer.start(registry);
        return chain.filter(exchange)
            .doOnSuccessOrError((done, ex) -> {
                sample.stop(registry.timer("http.requests", 
                    "uri", path,
                    "status", exchange.getResponse().getStatusCode().toString(),
                    "method", exchange.getRequest().getMethodValue()));
            });
    };
}

最佳實踐總結(jié)

  1. 線程模型理解

    • WebFlux 默認(rèn)使用 Netty 事件循環(huán)線程
    • 阻塞操作必須使用 publishOn 切換到彈性線程池
  2. 背壓策略選擇

    • UI 客戶端:使用 onBackpressureDroponBackpressureLatest
    • 服務(wù)間通信:使用 onBackpressureBuffer 配合合理緩沖區(qū)大小
  3. 錯誤處理原則

    • 盡早處理錯誤
    • 為每個 Flux/Mono 鏈添加錯誤處理
    • 區(qū)分業(yè)務(wù)異常和系統(tǒng)異常
  4. 測試策略

    • 使用 StepVerifier 測試響應(yīng)式流
    • 使用 WebTestClient 測試控制器
    • 虛擬時間測試長時間操作
  5. 性能調(diào)優(yōu)

    • 合理配置連接池
    • 監(jiān)控關(guān)鍵指標(biāo)(延遲、吞吐量、資源使用率)
    • 使用響應(yīng)式日志框架(如 Logback 異步Appender)

通過以上全面實踐,您將能夠構(gòu)建高性能、可擴(kuò)展的響應(yīng)式 Web 應(yīng)用,充分利用 WebFlux 的非阻塞特性,處理高并發(fā)場景下的各種挑戰(zhàn)。

到此這篇關(guān)于Spring WebFlux 深度實踐指南的文章就介紹到這了,更多相關(guān)Spring WebFlux內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • swing中Tree與滾動條用法實例分析

    swing中Tree與滾動條用法實例分析

    這篇文章主要介紹了swing中Tree與滾動條用法,以實例形式分析了java基于swing實現(xiàn)圖形界面的使用技巧,需要的朋友可以參考下
    2015-09-09
  • RocketMQ消息發(fā)送與消息類別詳解

    RocketMQ消息發(fā)送與消息類別詳解

    這篇文章主要介紹了RocketMQ消息發(fā)送與消息類別詳解,事務(wù)消息的生產(chǎn)者執(zhí)行本地事務(wù),并根據(jù)事務(wù)執(zhí)行的結(jié)果選擇是否提交或回滾事務(wù),
    如果事務(wù)執(zhí)行成功并選擇提交事務(wù),則產(chǎn)生注冊成功消息,進(jìn)入下一步,需要的朋友可以參考下
    2023-09-09
  • log4j使用詳細(xì)解析

    log4j使用詳細(xì)解析

    Log4j 除了可以記錄程序運(yùn)行日志信息外還有一重要的功能就是用來顯示調(diào)試信息。下面通過本文給大家介紹log4j使用詳細(xì)解析,感興趣的朋友一起看看吧
    2017-10-10
  • MyBatis Plus實現(xiàn)一對多的查詢場景的三種方法

    MyBatis Plus實現(xiàn)一對多的查詢場景的三種方法

    MyBatis Plus提供了多種簡便的方式來進(jìn)行一對多子查詢,本文主要介紹了MyBatis Plus實現(xiàn)一對多的查詢場景的三種方法,具有一定的參考價值,感興趣的可以了解一下
    2024-07-07
  • Java中結(jié)束循環(huán)的方法

    Java中結(jié)束循環(huán)的方法

    這篇文章主要介紹了Java中結(jié)束循環(huán)的方法,文中有段代碼在return,結(jié)束了整個main方法,即使輸出hello world的語句位于循環(huán)體外,也不會被執(zhí)行,對java結(jié)束循環(huán)方法感興趣的朋友跟隨小編一起看看吧
    2023-06-06
  • Mybatis?ResultMap和分頁操作示例詳解

    Mybatis?ResultMap和分頁操作示例詳解

    這篇文章主要為大家介紹了Mybatis?ResultMap和分頁操作示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-10-10
  • 解決mybatis使用char類型字段查詢oracle數(shù)據(jù)庫時結(jié)果返回null問題

    解決mybatis使用char類型字段查詢oracle數(shù)據(jù)庫時結(jié)果返回null問題

    這篇文章主要介紹了mybatis使用char類型字段查詢oracle數(shù)據(jù)庫時結(jié)果返回null問題的解決方法,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價值,需要的朋友可以參考下
    2018-06-06
  • SpringBoot3.x打包Docker容器的實現(xiàn)

    SpringBoot3.x打包Docker容器的實現(xiàn)

    這篇文章主要介紹了SpringBoot3.x打包Docker容器的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-11-11
  • 淺談Java字符串比較的三種方法

    淺談Java字符串比較的三種方法

    這篇文章主要介紹了淺談Java字符串比較的三種方法,字符串比較是常見的操作,包括比較相等、比較大小、比較前綴和后綴串等,需要的朋友可以參考下
    2023-04-04
  • Java利用POI實現(xiàn)導(dǎo)入導(dǎo)出Excel表格示例代碼

    Java利用POI實現(xiàn)導(dǎo)入導(dǎo)出Excel表格示例代碼

    最近工作中遇到一個需求,是需要導(dǎo)出數(shù)據(jù)到Excel表格里,所以寫個Demo測試一下,還是比較簡單的,現(xiàn)在分享給大家,有需要的朋友們可以參考借鑒,下面來一起看看吧。
    2016-10-10

最新評論