React編程模型之Project Reactor實際應用實例
3.3 Project Reactor(Spring Reactor)
Project Reactor是Spring生態(tài)系統(tǒng)中的響應式編程庫,它為構(gòu)建非阻塞、異步和事件驅(qū)動的應用程序提供了強大的工具集。作為Spring WebFlux的默認響應式庫,Reactor實現(xiàn)了Reactive Streams規(guī)范,使開發(fā)者能夠以聲明式的方式處理異步數(shù)據(jù)流。
3.3.1 Mono(0-1個數(shù)據(jù)流)
Mono是Project Reactor中表示最多包含一個元素的異步序列的核心類型。它代表了一種可能在未來某個時間點產(chǎn)生單個值(或空值)的異步計算。
核心特性
- 單值或空序列:Mono要么發(fā)出一個元素,要么不發(fā)出任何元素(僅發(fā)出完成信號)
- 延遲執(zhí)行:Mono的操作通常是惰性的,只有在訂閱時才會執(zhí)行
- 異步處理:Mono支持非阻塞的異步處理模式
- 錯誤處理:提供了豐富的錯誤處理機制
創(chuàng)建Mono的方式
// 1. 從值創(chuàng)建 Mono<String> mono1 = Mono.just("Hello"); Mono<String> mono2 = Mono.justOrEmpty(null); // 允許空值 // 2. 從Supplier創(chuàng)建 Mono<String> mono3 = Mono.fromSupplier(() -> "Hello from Supplier"); // 3. 從Callable創(chuàng)建 Mono<String> mono4 = Mono.fromCallable(() -> "Hello from Callable"); // 4. 從Future創(chuàng)建 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello from Future"); Mono<String> mono5 = Mono.fromFuture(future); // 5. 空Mono Mono<Void> mono6 = Mono.empty(); // 6. 錯誤Mono Mono<String> mono7 = Mono.error(new RuntimeException("Something went wrong"));
常用操作符
轉(zhuǎn)換操作符:
map
:同步轉(zhuǎn)換值flatMap
:異步轉(zhuǎn)換值(返回另一個Mono)flatMapMany
:將Mono轉(zhuǎn)換為Flux
Mono<String> original = Mono.just("Hello"); Mono<Integer> mapped = original.map(String::length); Mono<String> flatMapped = original.flatMap(s -> Mono.just(s + " World")); Flux<String> flatMapMany = original.flatMapMany(s -> Flux.just(s.split("")));
過濾操作符:
filter
:基于條件過濾defaultIfEmpty
:如果為空提供默認值
Mono<String> filtered = Mono.just("Hello") .filter(s -> s.length() > 3); Mono<String> withDefault = Mono.<String>empty() .defaultIfEmpty("Default Value");
錯誤處理操作符:
onErrorReturn
:出錯時返回默認值onErrorResume
:出錯時切換到備用的MonoonErrorMap
:轉(zhuǎn)換錯誤類型retry
:重試操作
Mono<String> withErrorHandling = Mono.error(new RuntimeException()) .onErrorReturn("Fallback Value"); Mono<String> withResume = Mono.error(new RuntimeException()) .onErrorResume(e -> Mono.just("Recovered from " + e.getMessage()));
組合操作符:
zipWith
:與另一個Mono組合then
:忽略當前結(jié)果,執(zhí)行另一個操作
Mono<String> first = Mono.just("Hello"); Mono<String> second = Mono.just("World"); Mono<String> zipped = first.zipWith(second, (f, s) -> f + " " + s); Mono<Void> thenOperation = Mono.just("Hello").then(Mono.empty());
時間相關操作符:
delayElement
:延遲發(fā)出元素timeout
:設置超時時間
Mono<String> delayed = Mono.just("Hello") .delayElement(Duration.ofSeconds(1)); Mono<String> withTimeout = Mono.just("Hello") .delayElement(Duration.ofSeconds(2)) .timeout(Duration.ofSeconds(1));
訂閱Mono
Mono是惰性的,只有訂閱時才會執(zhí)行:
// 1. 簡單訂閱 mono.subscribe(); // 2. 帶消費者的訂閱 mono.subscribe( value -> System.out.println("Received: " + value), // onNext error -> System.err.println("Error: " + error), // onError () -> System.out.println("Completed") // onComplete ); // 3. 帶Subscription控制的訂閱 mono.subscribe(new BaseSubscriber<String>() { @Override protected void hookOnSubscribe(Subscription subscription) { System.out.println("Subscribed"); request(1); // 請求第一個元素 } @Override protected void hookOnNext(String value) { System.out.println("Received: " + value); request(1); // 請求下一個元素(對于Mono通常不需要) } });
實際應用場景
- HTTP請求響應:在WebFlux中,控制器方法可以返回Mono來表示異步的單個響應
- 數(shù)據(jù)庫操作:Spring Data Reactive Repositories返回Mono用于單個實體的CRUD操作
- 緩存查詢:從緩存中獲取單個值
- 異步任務:執(zhí)行返回單個結(jié)果的異步計算
@RestController public class UserController { private final UserRepository userRepository; public UserController(UserRepository userRepository) { this.userRepository = userRepository; } @GetMapping("/users/{id}") public Mono<User> getUser(@PathVariable String id) { return userRepository.findById(id) .switchIfEmpty(Mono.error(new UserNotFoundException(id))); } }
3.3.2 Flux(0-N個數(shù)據(jù)流)
Flux是Project Reactor中表示0到N個元素的異步序列的核心類型。它代表了一個可能在未來某個時間點產(chǎn)生多個值的異步數(shù)據(jù)流。
核心特性
- 多值序列:Flux可以發(fā)出0到N個元素,然后可選地以一個完成信號或錯誤信號結(jié)束
- 背壓支持:允許消費者控制生產(chǎn)者的速度,防止內(nèi)存溢出
- 延遲執(zhí)行:與Mono類似,F(xiàn)lux的操作也是惰性的
- 豐富的操作符:提供了大量操作符來處理、轉(zhuǎn)換和組合數(shù)據(jù)流 創(chuàng)建Flux的方式
// 1. 從多個值創(chuàng)建 Flux<String> flux1 = Flux.just("A", "B", "C"); // 2. 從數(shù)組或集合創(chuàng)建 Flux<String> flux2 = Flux.fromArray(new String[]{"A", "B", "C"}); Flux<String> flux3 = Flux.fromIterable(Arrays.asList("A", "B", "C")); // 3. 從范圍創(chuàng)建 Flux<Integer> flux4 = Flux.range(1, 5); // 1, 2, 3, 4, 5 // 4. 從流生成器創(chuàng)建 Flux<Long> flux5 = Flux.generate( () -> 0L, // 初始狀態(tài) (state, sink) -> { sink.next(state); if (state == 10) sink.complete(); return state + 1; } ); // 5. 從間隔創(chuàng)建(周期性發(fā)出值) Flux<Long> flux6 = Flux.interval(Duration.ofMillis(100)); // 0, 1, 2... 每100ms // 6. 空Flux Flux<String> flux7 = Flux.empty(); // 7. 錯誤Flux Flux<String> flux8 = Flux.error(new RuntimeException("Flux error"));
常用操作符
轉(zhuǎn)換操作符:
map
:同步轉(zhuǎn)換每個元素flatMap
:異步轉(zhuǎn)換每個元素(返回Flux)concatMap
:保持順序的flatMapflatMapSequential
:合并結(jié)果但保持源順序
Flux<String> original = Flux.just("apple", "banana", "cherry"); Flux<Integer> mapped = original.map(String::length); Flux<String> flatMapped = original.flatMap(s -> Flux.fromArray(s.split("")));
過濾操作符:
filter
:基于條件過濾元素take
:取前N個元素skip
:跳過前N個元素distinct
:去重
Flux<Integer> numbers = Flux.range(1, 10); Flux<Integer> evens = numbers.filter(n -> n % 2 == 0); Flux<Integer> first5 = numbers.take(5);
組合操作符:
mergeWith
:合并多個Flux,不保證順序concatWith
:順序連接FluxzipWith
:與另一個Flux組合combineLatest
:當任一Flux發(fā)出值時組合最新值
Flux<String> fluxA = Flux.just("A", "B", "C"); Flux<String> fluxB = Flux.just("1", "2", "3"); Flux<String> merged = fluxA.mergeWith(fluxB); // A, 1, B, 2, C, 3 Flux<String> concatenated = fluxA.concatWith(fluxB); // A, B, C, 1, 2, 3 Flux<String> zipped = fluxA.zipWith(fluxB, (a, b) -> a + b); // A1, B2, C3
錯誤處理操作符:
onErrorReturn
:出錯時返回默認值onErrorResume
:出錯時切換到備用的FluxonErrorContinue
:跳過錯誤元素并繼續(xù)處理retry
:重試操作
Flux<Integer> withErrorHandling = Flux.just(1, 2, 0, 4) .map(i -> 10 / i) .onErrorResume(e -> Flux.just(-1)); Flux<Integer> withContinue = Flux.just(1, 2, 0, 4) .map(i -> { try { return 10 / i; } catch (Exception e) { throw Exceptions.propagate(e); } }) .onErrorContinue((e, o) -> System.out.println("Error for " + o + ": " + e.getMessage()));
背壓操作符:
onBackpressureBuffer
:緩沖元素onBackpressureDrop
:丟棄無法處理的元素onBackpressureLatest
:只保留最新元素
Flux.range(1, 1000) .onBackpressureBuffer(50) // 緩沖區(qū)大小為50 .subscribe(new BaseSubscriber<Integer>() { @Override protected void hookOnSubscribe(Subscription subscription) { request(10); // 初始請求10個元素 } @Override protected void hookOnNext(Integer value) { System.out.println("Received: " + value); // 根據(jù)需要請求更多元素 if (value % 10 == 0) { request(10); } } });
時間相關操作符:
delayElements
:延遲發(fā)出每個元素timeout
:設置超時時間sample
:定期采樣
Flux.range(1, 5) .delayElements(Duration.ofMillis(500)) .subscribe(System.out::println);
訂閱Flux
與Mono類似,F(xiàn)lux也是惰性的,需要訂閱才能執(zhí)行:
// 1. 簡單訂閱 flux.subscribe(); // 2. 帶消費者的訂閱 flux.subscribe( value -> System.out.println("Received: " + value), // onNext error -> System.err.println("Error: " + error), // onError () -> System.out.println("Completed"), // onComplete subscription -> subscription.request(3) // 初始請求3個元素 ); // 3. 帶Subscription控制的訂閱 flux.subscribe(new BaseSubscriber<String>() { @Override protected void hookOnSubscribe(Subscription subscription) { System.out.println("Subscribed"); request(1); // 請求第一個元素 } @Override protected void hookOnNext(String value) { System.out.println("Received: " + value); // 處理完當前元素后請求下一個 request(1); } });
實際應用場景
- 流式API:提供持續(xù)更新的數(shù)據(jù)流(如股票價格、傳感器數(shù)據(jù))
- 批量數(shù)據(jù)處理:處理大量數(shù)據(jù)時避免內(nèi)存溢出
- 事件處理:處理來自消息隊列或事件總線的事件流
- 文件處理:逐行讀取大文件
@RestController public class EventController { private final EventPublisher eventPublisher; public EventController(EventPublisher eventPublisher) { this.eventPublisher = eventPublisher; } @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<Event> getEvents() { return eventPublisher .publish() .map(event -> { // 轉(zhuǎn)換或豐富事件數(shù)據(jù) return event; }) .onBackpressureBuffer(100) .log(); } }
3.3.3 Schedulers(調(diào)度器)
在響應式編程中,Schedulers負責管理線程和執(zhí)行上下文。Project Reactor提供了多種預定義的Scheduler實現(xiàn),用于控制異步操作的執(zhí)行位置。
核心概念
- 線程模型:響應式編程通常使用少量線程處理大量并發(fā)
- 非阻塞I/O:避免線程阻塞,提高資源利用率
- 執(zhí)行上下文:決定操作在哪個線程或線程池上執(zhí)行
預定義的Scheduler類型
Schedulers.immediate():
- 在當前線程立即執(zhí)行
- 通常用于測試或不需要異步的場景
Schedulers.single():
- 使用單個可重用的線程
- 適用于低延遲的輕量級任務
- 所有調(diào)用者共享同一個線程
Schedulers.elastic()(已棄用,推薦使用boundedElastic):
- 無限擴展的線程池
- 適合阻塞I/O操作
- 每個新任務可能創(chuàng)建新線程
Schedulers.boundedElastic():
- 有界的彈性線程池
- 默認最多創(chuàng)建10 * CPU核心數(shù)的線程
- 適合阻塞I/O操作
- 比elastic更可控,避免資源耗盡
Schedulers.parallel():
- 固定大小的并行線程池
- 默認大小等于CPU核心數(shù)
- 適合計算密集型任務
Schedulers.fromExecutorService():
- 從現(xiàn)有的ExecutorService創(chuàng)建
- 允許與現(xiàn)有線程池集成
使用Scheduler
發(fā)布到Scheduler:
publishOn
:影響后續(xù)操作符的執(zhí)行上下文subscribeOn
:影響整個鏈的訂閱上下文(通常用在鏈的開頭)
Flux.range(1, 5) .map(i -> { System.out.println("Map on " + Thread.currentThread().getName()); return i * 2; }) .publishOn(Schedulers.boundedElastic()) .filter(i -> { System.out.println("Filter on " + Thread.currentThread().getName()); return i % 3 == 0; }) .subscribeOn(Schedulers.parallel()) .subscribe();
指定操作符的Scheduler:
許多操作符接受可選的Scheduler參數(shù)
Flux.interval(Duration.ofMillis(100), Schedulers.single()) .subscribe(System.out::println);
調(diào)度策略選擇指南
計算密集型操作:
- 使用
parallel()
調(diào)度器 - 避免線程切換開銷
- 線程數(shù)通常等于CPU核心數(shù)
阻塞I/O操作:
- 使用
boundedElastic()
調(diào)度器 - 防止阻塞事件循環(huán)線程
- 允許更多的線程處理并發(fā)I/O
非阻塞異步操作:
- 通常不需要顯式調(diào)度
- 由底層異步庫管理線程
UI交互:
- 使用專用的UI線程調(diào)度器
- 通過
Schedulers.fromExecutor()
與UI框架集成
最佳實踐
避免在響應式鏈中阻塞:
- 如果必須阻塞,使用
publishOn
切換到適當?shù)恼{(diào)度器
合理選擇調(diào)度器:
- 計算密集型:parallel
- I/O密集型:boundedElastic
- 事件循環(huán):通常不需要顯式調(diào)度
注意上下文傳播:
- Reactor Context可以攜帶跨線程的信息
- 使用
contextWrite
和deferContextual
管理上下文
資源清理:
對于自定義調(diào)度器或長時間運行的應用程序,注意關閉調(diào)度器
Scheduler scheduler = Schedulers.newBoundedElastic(10, 100, "custom"); try { Flux.just(1, 2, 3) .publishOn(scheduler) .subscribe(System.out::println); } finally { scheduler.dispose(); }
實際應用示例
@RestController public class DataController { private final DataService dataService; public DataController(DataService dataService) { this.dataService = dataService; } @GetMapping("/data/{id}") public Mono<Data> getData(@PathVariable String id) { // 使用boundedElastic處理潛在的阻塞操作 return Mono.fromCallable(() -> dataService.blockingGetData(id)) .subscribeOn(Schedulers.boundedElastic()) .timeout(Duration.ofSeconds(2)) .onErrorResume(e -> Mono.just(Data.fallbackData())); } @GetMapping("/stream") public Flux<Data> streamData() { // 使用parallel處理計算密集型流 return dataService.dataStream() .publishOn(Schedulers.parallel()) .map(data -> { // 計算密集型轉(zhuǎn)換 return data.withProcessedPayload(processPayload(data.getPayload())); }) .log(); } private String processPayload(String payload) { // 模擬計算密集型處理 return payload.toUpperCase(); } }
總結(jié)
Project Reactor為Java響應式編程提供了強大的工具集:
- Mono:處理0-1個結(jié)果的異步操作,適合單個值或空結(jié)果的場景
- Flux:處理0-N個結(jié)果的異步序列,適合流式數(shù)據(jù)處理
- Schedulers:管理執(zhí)行上下文,優(yōu)化線程使用和資源分配
通過合理組合這些組件,開發(fā)者可以構(gòu)建高效、可擴展的響應式應用程序,充分利用現(xiàn)代硬件資源,同時保持代碼的清晰和可維護性。響應式編程模型特別適合高并發(fā)、低延遲的應用場景,如微服務架構(gòu)、實時數(shù)據(jù)處理和事件驅(qū)動系統(tǒng)。
到此這篇關于React編程模型:Project Reactor深度解析的文章就介紹到這了,更多相關React Project Reactor內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
一文詳解手動實現(xiàn)Recoil狀態(tài)管理基本原理
這篇文章主要為大家介紹了一文詳解手動實現(xiàn)Recoil狀態(tài)管理基本原理實例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-05-05React?state結(jié)構(gòu)設計原則示例詳解
這篇文章主要為大家介紹了React?state結(jié)構(gòu)設計原則示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-06-06react使用antd的上傳組件實現(xiàn)文件表單一起提交功能(完整代碼)
最近在做一個后臺管理項目,涉及到react相關知識,項目需求需要在表單中帶附件提交,怎么實現(xiàn)這個功能呢?下面小編給大家?guī)砹藃eact使用antd的上傳組件實現(xiàn)文件表單一起提交功能,一起看看吧2021-06-06