React編程模型之Project Reactor實際應用實例


3.3 Project Reactor(Spring Reactor)

Project Reactor是Spring生態(tài)系統(tǒng)中的響應式編程庫,它為構建非阻塞、異步和事件驅動的應用程序提供了強大的工具集。作為Spring WebFlux的默認響應式庫,Reactor實現(xiàn)了Reactive Streams規(guī)范,使開發(fā)者能夠以聲明式的方式處理異步數(shù)據(jù)流。
3.3.1 Mono(0-1個數(shù)據(jù)流)
Mono是Project Reactor中表示最多包含一個元素的異步序列的核心類型。它代表了一種可能在未來某個時間點產(chǎn)生單個值(或空值)的異步計算。
核心特性
- 單值或空序列:Mono要么發(fā)出一個元素,要么不發(fā)出任何元素(僅發(fā)出完成信號)
- 延遲執(zhí)行:Mono的操作通常是惰性的,只有在訂閱時才會執(zhí)行
- 異步處理:Mono支持非阻塞的異步處理模式
- 錯誤處理:提供了豐富的錯誤處理機制
創(chuàng)建Mono的方式

// 1. 從值創(chuàng)建
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.justOrEmpty(null); // 允許空值
// 2. 從Supplier創(chuàng)建
Mono<String> mono3 = Mono.fromSupplier(() -> "Hello from Supplier");
// 3. 從Callable創(chuàng)建
Mono<String> mono4 = Mono.fromCallable(() -> "Hello from Callable");
// 4. 從Future創(chuàng)建
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello from Future");
Mono<String> mono5 = Mono.fromFuture(future);
// 5. 空Mono
Mono<Void> mono6 = Mono.empty();
// 6. 錯誤Mono
Mono<String> mono7 = Mono.error(new RuntimeException("Something went wrong"));常用操作符

轉換操作符:
map:同步轉換值flatMap:異步轉換值(返回另一個Mono)flatMapMany:將Mono轉換為Flux
Mono<String> original = Mono.just("Hello");
Mono<Integer> mapped = original.map(String::length);
Mono<String> flatMapped = original.flatMap(s -> Mono.just(s + " World"));
Flux<String> flatMapMany = original.flatMapMany(s -> Flux.just(s.split("")));過濾操作符:
filter:基于條件過濾defaultIfEmpty:如果為空提供默認值
Mono<String> filtered = Mono.just("Hello")
.filter(s -> s.length() > 3);
Mono<String> withDefault = Mono.<String>empty()
.defaultIfEmpty("Default Value");錯誤處理操作符:
onErrorReturn:出錯時返回默認值onErrorResume:出錯時切換到備用的MonoonErrorMap:轉換錯誤類型retry:重試操作
Mono<String> withErrorHandling = Mono.error(new RuntimeException())
.onErrorReturn("Fallback Value");
Mono<String> withResume = Mono.error(new RuntimeException())
.onErrorResume(e -> Mono.just("Recovered from " + e.getMessage()));組合操作符:
zipWith:與另一個Mono組合then:忽略當前結果,執(zhí)行另一個操作
Mono<String> first = Mono.just("Hello");
Mono<String> second = Mono.just("World");
Mono<String> zipped = first.zipWith(second, (f, s) -> f + " " + s);
Mono<Void> thenOperation = Mono.just("Hello").then(Mono.empty());時間相關操作符:
delayElement:延遲發(fā)出元素timeout:設置超時時間
Mono<String> delayed = Mono.just("Hello")
.delayElement(Duration.ofSeconds(1));
Mono<String> withTimeout = Mono.just("Hello")
.delayElement(Duration.ofSeconds(2))
.timeout(Duration.ofSeconds(1));訂閱Mono

Mono是惰性的,只有訂閱時才會執(zhí)行:
// 1. 簡單訂閱
mono.subscribe();
// 2. 帶消費者的訂閱
mono.subscribe(
value -> System.out.println("Received: " + value), // onNext
error -> System.err.println("Error: " + error), // onError
() -> System.out.println("Completed") // onComplete
);
// 3. 帶Subscription控制的訂閱
mono.subscribe(new BaseSubscriber<String>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed");
request(1); // 請求第一個元素
}
@Override
protected void hookOnNext(String value) {
System.out.println("Received: " + value);
request(1); // 請求下一個元素(對于Mono通常不需要)
}
});實際應用場景

- HTTP請求響應:在WebFlux中,控制器方法可以返回Mono來表示異步的單個響應
- 數(shù)據(jù)庫操作:Spring Data Reactive Repositories返回Mono用于單個實體的CRUD操作
- 緩存查詢:從緩存中獲取單個值
- 異步任務:執(zhí)行返回單個結果的異步計算
@RestController
public class UserController {
private final UserRepository userRepository;
public UserController(UserRepository userRepository) {
this.userRepository = userRepository;
}
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable String id) {
return userRepository.findById(id)
.switchIfEmpty(Mono.error(new UserNotFoundException(id)));
}
}3.3.2 Flux(0-N個數(shù)據(jù)流)

Flux是Project Reactor中表示0到N個元素的異步序列的核心類型。它代表了一個可能在未來某個時間點產(chǎn)生多個值的異步數(shù)據(jù)流。
核心特性
- 多值序列:Flux可以發(fā)出0到N個元素,然后可選地以一個完成信號或錯誤信號結束
- 背壓支持:允許消費者控制生產(chǎn)者的速度,防止內存溢出
- 延遲執(zhí)行:與Mono類似,F(xiàn)lux的操作也是惰性的
- 豐富的操作符:提供了大量操作符來處理、轉換和組合數(shù)據(jù)流 創(chuàng)建Flux的方式
// 1. 從多個值創(chuàng)建
Flux<String> flux1 = Flux.just("A", "B", "C");
// 2. 從數(shù)組或集合創(chuàng)建
Flux<String> flux2 = Flux.fromArray(new String[]{"A", "B", "C"});
Flux<String> flux3 = Flux.fromIterable(Arrays.asList("A", "B", "C"));
// 3. 從范圍創(chuàng)建
Flux<Integer> flux4 = Flux.range(1, 5); // 1, 2, 3, 4, 5
// 4. 從流生成器創(chuàng)建
Flux<Long> flux5 = Flux.generate(
() -> 0L, // 初始狀態(tài)
(state, sink) -> {
sink.next(state);
if (state == 10) sink.complete();
return state + 1;
}
);
// 5. 從間隔創(chuàng)建(周期性發(fā)出值)
Flux<Long> flux6 = Flux.interval(Duration.ofMillis(100)); // 0, 1, 2... 每100ms
// 6. 空Flux
Flux<String> flux7 = Flux.empty();
// 7. 錯誤Flux
Flux<String> flux8 = Flux.error(new RuntimeException("Flux error"));常用操作符

轉換操作符:
map:同步轉換每個元素flatMap:異步轉換每個元素(返回Flux)concatMap:保持順序的flatMapflatMapSequential:合并結果但保持源順序
Flux<String> original = Flux.just("apple", "banana", "cherry");
Flux<Integer> mapped = original.map(String::length);
Flux<String> flatMapped = original.flatMap(s -> Flux.fromArray(s.split("")));過濾操作符:
filter:基于條件過濾元素take:取前N個元素skip:跳過前N個元素distinct:去重
Flux<Integer> numbers = Flux.range(1, 10); Flux<Integer> evens = numbers.filter(n -> n % 2 == 0); Flux<Integer> first5 = numbers.take(5);
組合操作符:
mergeWith:合并多個Flux,不保證順序concatWith:順序連接FluxzipWith:與另一個Flux組合combineLatest:當任一Flux發(fā)出值時組合最新值
Flux<String> fluxA = Flux.just("A", "B", "C");
Flux<String> fluxB = Flux.just("1", "2", "3");
Flux<String> merged = fluxA.mergeWith(fluxB); // A, 1, B, 2, C, 3
Flux<String> concatenated = fluxA.concatWith(fluxB); // A, B, C, 1, 2, 3
Flux<String> zipped = fluxA.zipWith(fluxB, (a, b) -> a + b); // A1, B2, C3錯誤處理操作符:
onErrorReturn:出錯時返回默認值onErrorResume:出錯時切換到備用的FluxonErrorContinue:跳過錯誤元素并繼續(xù)處理retry:重試操作
Flux<Integer> withErrorHandling = Flux.just(1, 2, 0, 4)
.map(i -> 10 / i)
.onErrorResume(e -> Flux.just(-1));
Flux<Integer> withContinue = Flux.just(1, 2, 0, 4)
.map(i -> {
try {
return 10 / i;
} catch (Exception e) {
throw Exceptions.propagate(e);
}
})
.onErrorContinue((e, o) -> System.out.println("Error for " + o + ": " + e.getMessage()));背壓操作符:
onBackpressureBuffer:緩沖元素onBackpressureDrop:丟棄無法處理的元素onBackpressureLatest:只保留最新元素
Flux.range(1, 1000)
.onBackpressureBuffer(50) // 緩沖區(qū)大小為50
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(10); // 初始請求10個元素
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("Received: " + value);
// 根據(jù)需要請求更多元素
if (value % 10 == 0) {
request(10);
}
}
});時間相關操作符:
delayElements:延遲發(fā)出每個元素timeout:設置超時時間sample:定期采樣
Flux.range(1, 5)
.delayElements(Duration.ofMillis(500))
.subscribe(System.out::println);訂閱Flux

與Mono類似,F(xiàn)lux也是惰性的,需要訂閱才能執(zhí)行:
// 1. 簡單訂閱
flux.subscribe();
// 2. 帶消費者的訂閱
flux.subscribe(
value -> System.out.println("Received: " + value), // onNext
error -> System.err.println("Error: " + error), // onError
() -> System.out.println("Completed"), // onComplete
subscription -> subscription.request(3) // 初始請求3個元素
);
// 3. 帶Subscription控制的訂閱
flux.subscribe(new BaseSubscriber<String>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed");
request(1); // 請求第一個元素
}
@Override
protected void hookOnNext(String value) {
System.out.println("Received: " + value);
// 處理完當前元素后請求下一個
request(1);
}
});實際應用場景

- 流式API:提供持續(xù)更新的數(shù)據(jù)流(如股票價格、傳感器數(shù)據(jù))
- 批量數(shù)據(jù)處理:處理大量數(shù)據(jù)時避免內存溢出
- 事件處理:處理來自消息隊列或事件總線的事件流
- 文件處理:逐行讀取大文件
@RestController
public class EventController {
private final EventPublisher eventPublisher;
public EventController(EventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
}
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> getEvents() {
return eventPublisher
.publish()
.map(event -> {
// 轉換或豐富事件數(shù)據(jù)
return event;
})
.onBackpressureBuffer(100)
.log();
}
}3.3.3 Schedulers(調度器)
在響應式編程中,Schedulers負責管理線程和執(zhí)行上下文。Project Reactor提供了多種預定義的Scheduler實現(xiàn),用于控制異步操作的執(zhí)行位置。
核心概念

- 線程模型:響應式編程通常使用少量線程處理大量并發(fā)
- 非阻塞I/O:避免線程阻塞,提高資源利用率
- 執(zhí)行上下文:決定操作在哪個線程或線程池上執(zhí)行
預定義的Scheduler類型
Schedulers.immediate():
- 在當前線程立即執(zhí)行
- 通常用于測試或不需要異步的場景
Schedulers.single():
- 使用單個可重用的線程
- 適用于低延遲的輕量級任務
- 所有調用者共享同一個線程
Schedulers.elastic()(已棄用,推薦使用boundedElastic):
- 無限擴展的線程池
- 適合阻塞I/O操作
- 每個新任務可能創(chuàng)建新線程
Schedulers.boundedElastic():
- 有界的彈性線程池
- 默認最多創(chuàng)建10 * CPU核心數(shù)的線程
- 適合阻塞I/O操作
- 比elastic更可控,避免資源耗盡
Schedulers.parallel():
- 固定大小的并行線程池
- 默認大小等于CPU核心數(shù)
- 適合計算密集型任務
Schedulers.fromExecutorService():
- 從現(xiàn)有的ExecutorService創(chuàng)建
- 允許與現(xiàn)有線程池集成
使用Scheduler
發(fā)布到Scheduler:
publishOn:影響后續(xù)操作符的執(zhí)行上下文subscribeOn:影響整個鏈的訂閱上下文(通常用在鏈的開頭)
Flux.range(1, 5)
.map(i -> {
System.out.println("Map on " + Thread.currentThread().getName());
return i * 2;
})
.publishOn(Schedulers.boundedElastic())
.filter(i -> {
System.out.println("Filter on " + Thread.currentThread().getName());
return i % 3 == 0;
})
.subscribeOn(Schedulers.parallel())
.subscribe();指定操作符的Scheduler:
許多操作符接受可選的Scheduler參數(shù)
Flux.interval(Duration.ofMillis(100), Schedulers.single())
.subscribe(System.out::println);調度策略選擇指南
計算密集型操作:
- 使用
parallel()調度器 - 避免線程切換開銷
- 線程數(shù)通常等于CPU核心數(shù)
阻塞I/O操作:
- 使用
boundedElastic()調度器 - 防止阻塞事件循環(huán)線程
- 允許更多的線程處理并發(fā)I/O
非阻塞異步操作:
- 通常不需要顯式調度
- 由底層異步庫管理線程
UI交互:
- 使用專用的UI線程調度器
- 通過
Schedulers.fromExecutor()與UI框架集成
最佳實踐

避免在響應式鏈中阻塞:
- 如果必須阻塞,使用
publishOn切換到適當?shù)恼{度器
合理選擇調度器:
- 計算密集型:parallel
- I/O密集型:boundedElastic
- 事件循環(huán):通常不需要顯式調度
注意上下文傳播:
- Reactor Context可以攜帶跨線程的信息
- 使用
contextWrite和deferContextual管理上下文
資源清理:
對于自定義調度器或長時間運行的應用程序,注意關閉調度器
Scheduler scheduler = Schedulers.newBoundedElastic(10, 100, "custom");
try {
Flux.just(1, 2, 3)
.publishOn(scheduler)
.subscribe(System.out::println);
} finally {
scheduler.dispose();
}實際應用示例
@RestController
public class DataController {
private final DataService dataService;
public DataController(DataService dataService) {
this.dataService = dataService;
}
@GetMapping("/data/{id}")
public Mono<Data> getData(@PathVariable String id) {
// 使用boundedElastic處理潛在的阻塞操作
return Mono.fromCallable(() -> dataService.blockingGetData(id))
.subscribeOn(Schedulers.boundedElastic())
.timeout(Duration.ofSeconds(2))
.onErrorResume(e -> Mono.just(Data.fallbackData()));
}
@GetMapping("/stream")
public Flux<Data> streamData() {
// 使用parallel處理計算密集型流
return dataService.dataStream()
.publishOn(Schedulers.parallel())
.map(data -> {
// 計算密集型轉換
return data.withProcessedPayload(processPayload(data.getPayload()));
})
.log();
}
private String processPayload(String payload) {
// 模擬計算密集型處理
return payload.toUpperCase();
}
}總結

Project Reactor為Java響應式編程提供了強大的工具集:
- Mono:處理0-1個結果的異步操作,適合單個值或空結果的場景
- Flux:處理0-N個結果的異步序列,適合流式數(shù)據(jù)處理
- Schedulers:管理執(zhí)行上下文,優(yōu)化線程使用和資源分配
通過合理組合這些組件,開發(fā)者可以構建高效、可擴展的響應式應用程序,充分利用現(xiàn)代硬件資源,同時保持代碼的清晰和可維護性。響應式編程模型特別適合高并發(fā)、低延遲的應用場景,如微服務架構、實時數(shù)據(jù)處理和事件驅動系統(tǒng)。
到此這篇關于React編程模型:Project Reactor深度解析的文章就介紹到這了,更多相關React Project Reactor內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
一文詳解手動實現(xiàn)Recoil狀態(tài)管理基本原理
這篇文章主要為大家介紹了一文詳解手動實現(xiàn)Recoil狀態(tài)管理基本原理實例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-05-05
react使用antd的上傳組件實現(xiàn)文件表單一起提交功能(完整代碼)
最近在做一個后臺管理項目,涉及到react相關知識,項目需求需要在表單中帶附件提交,怎么實現(xiàn)這個功能呢?下面小編給大家?guī)砹藃eact使用antd的上傳組件實現(xiàn)文件表單一起提交功能,一起看看吧2021-06-06

