欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

React編程模型之Project Reactor實際應用實例

 更新時間:2025年05月08日 11:37:59   作者:百錦再@新空間  
這篇文章主要介紹了React編程模型之Project Reactor實際應用實例,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧

3.3 Project Reactor(Spring Reactor)

Project Reactor是Spring生態(tài)系統(tǒng)中的響應式編程庫,它為構(gòu)建非阻塞、異步和事件驅(qū)動的應用程序提供了強大的工具集。作為Spring WebFlux的默認響應式庫,Reactor實現(xiàn)了Reactive Streams規(guī)范,使開發(fā)者能夠以聲明式的方式處理異步數(shù)據(jù)流。

3.3.1 Mono(0-1個數(shù)據(jù)流)

Mono是Project Reactor中表示最多包含一個元素的異步序列的核心類型。它代表了一種可能在未來某個時間點產(chǎn)生單個值(或空值)的異步計算。

核心特性

  • 單值或空序列:Mono要么發(fā)出一個元素,要么不發(fā)出任何元素(僅發(fā)出完成信號)
  • 延遲執(zhí)行:Mono的操作通常是惰性的,只有在訂閱時才會執(zhí)行
  • 異步處理:Mono支持非阻塞的異步處理模式
  • 錯誤處理:提供了豐富的錯誤處理機制

創(chuàng)建Mono的方式

// 1. 從值創(chuàng)建
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.justOrEmpty(null); // 允許空值
// 2. 從Supplier創(chuàng)建
Mono<String> mono3 = Mono.fromSupplier(() -> "Hello from Supplier");
// 3. 從Callable創(chuàng)建
Mono<String> mono4 = Mono.fromCallable(() -> "Hello from Callable");
// 4. 從Future創(chuàng)建
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello from Future");
Mono<String> mono5 = Mono.fromFuture(future);
// 5. 空Mono
Mono<Void> mono6 = Mono.empty();
// 6. 錯誤Mono
Mono<String> mono7 = Mono.error(new RuntimeException("Something went wrong"));

常用操作符

轉(zhuǎn)換操作符

  • map:同步轉(zhuǎn)換值
  • flatMap:異步轉(zhuǎn)換值(返回另一個Mono)
  • flatMapMany:將Mono轉(zhuǎn)換為Flux
Mono<String> original = Mono.just("Hello");
Mono<Integer> mapped = original.map(String::length);
Mono<String> flatMapped = original.flatMap(s -> Mono.just(s + " World"));
Flux<String> flatMapMany = original.flatMapMany(s -> Flux.just(s.split("")));

過濾操作符

  • filter:基于條件過濾
  • defaultIfEmpty:如果為空提供默認值
Mono<String> filtered = Mono.just("Hello")
    .filter(s -> s.length() > 3);
Mono<String> withDefault = Mono.<String>empty()
    .defaultIfEmpty("Default Value");

錯誤處理操作符

  • onErrorReturn:出錯時返回默認值
  • onErrorResume:出錯時切換到備用的Mono
  • onErrorMap:轉(zhuǎn)換錯誤類型
  • retry:重試操作
Mono<String> withErrorHandling = Mono.error(new RuntimeException())
    .onErrorReturn("Fallback Value");
Mono<String> withResume = Mono.error(new RuntimeException())
    .onErrorResume(e -> Mono.just("Recovered from " + e.getMessage()));

組合操作符

  • zipWith:與另一個Mono組合
  • then:忽略當前結(jié)果,執(zhí)行另一個操作
Mono<String> first = Mono.just("Hello");
Mono<String> second = Mono.just("World");
Mono<String> zipped = first.zipWith(second, (f, s) -> f + " " + s);
Mono<Void> thenOperation = Mono.just("Hello").then(Mono.empty());

時間相關操作符

  • delayElement:延遲發(fā)出元素
  • timeout:設置超時時間
Mono<String> delayed = Mono.just("Hello")
    .delayElement(Duration.ofSeconds(1));
Mono<String> withTimeout = Mono.just("Hello")
    .delayElement(Duration.ofSeconds(2))
    .timeout(Duration.ofSeconds(1));

訂閱Mono

Mono是惰性的,只有訂閱時才會執(zhí)行:

// 1. 簡單訂閱
mono.subscribe();
// 2. 帶消費者的訂閱
mono.subscribe(
    value -> System.out.println("Received: " + value), // onNext
    error -> System.err.println("Error: " + error),   // onError
    () -> System.out.println("Completed")             // onComplete
);
// 3. 帶Subscription控制的訂閱
mono.subscribe(new BaseSubscriber<String>() {
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        System.out.println("Subscribed");
        request(1); // 請求第一個元素
    }
    @Override
    protected void hookOnNext(String value) {
        System.out.println("Received: " + value);
        request(1); // 請求下一個元素(對于Mono通常不需要)
    }
});

實際應用場景

  • HTTP請求響應:在WebFlux中,控制器方法可以返回Mono來表示異步的單個響應
  • 數(shù)據(jù)庫操作:Spring Data Reactive Repositories返回Mono用于單個實體的CRUD操作
  • 緩存查詢:從緩存中獲取單個值
  • 異步任務:執(zhí)行返回單個結(jié)果的異步計算
@RestController
public class UserController {
    private final UserRepository userRepository;
    public UserController(UserRepository userRepository) {
        this.userRepository = userRepository;
    }
    @GetMapping("/users/{id}")
    public Mono<User> getUser(@PathVariable String id) {
        return userRepository.findById(id)
            .switchIfEmpty(Mono.error(new UserNotFoundException(id)));
    }
}

3.3.2 Flux(0-N個數(shù)據(jù)流)

Flux是Project Reactor中表示0到N個元素的異步序列的核心類型。它代表了一個可能在未來某個時間點產(chǎn)生多個值的異步數(shù)據(jù)流。

核心特性

  • 多值序列:Flux可以發(fā)出0到N個元素,然后可選地以一個完成信號或錯誤信號結(jié)束
  • 背壓支持:允許消費者控制生產(chǎn)者的速度,防止內(nèi)存溢出
  • 延遲執(zhí)行:與Mono類似,F(xiàn)lux的操作也是惰性的
  • 豐富的操作符:提供了大量操作符來處理、轉(zhuǎn)換和組合數(shù)據(jù)流 創(chuàng)建Flux的方式
// 1. 從多個值創(chuàng)建
Flux<String> flux1 = Flux.just("A", "B", "C");
// 2. 從數(shù)組或集合創(chuàng)建
Flux<String> flux2 = Flux.fromArray(new String[]{"A", "B", "C"});
Flux<String> flux3 = Flux.fromIterable(Arrays.asList("A", "B", "C"));
// 3. 從范圍創(chuàng)建
Flux<Integer> flux4 = Flux.range(1, 5); // 1, 2, 3, 4, 5
// 4. 從流生成器創(chuàng)建
Flux<Long> flux5 = Flux.generate(
    () -> 0L, // 初始狀態(tài)
    (state, sink) -> {
        sink.next(state);
        if (state == 10) sink.complete();
        return state + 1;
    }
);
// 5. 從間隔創(chuàng)建(周期性發(fā)出值)
Flux<Long> flux6 = Flux.interval(Duration.ofMillis(100)); // 0, 1, 2... 每100ms
// 6. 空Flux
Flux<String> flux7 = Flux.empty();
// 7. 錯誤Flux
Flux<String> flux8 = Flux.error(new RuntimeException("Flux error"));

常用操作符

轉(zhuǎn)換操作符

  • map:同步轉(zhuǎn)換每個元素
  • flatMap:異步轉(zhuǎn)換每個元素(返回Flux)
  • concatMap:保持順序的flatMap
  • flatMapSequential:合并結(jié)果但保持源順序
Flux<String> original = Flux.just("apple", "banana", "cherry");
Flux<Integer> mapped = original.map(String::length);
Flux<String> flatMapped = original.flatMap(s -> Flux.fromArray(s.split("")));

過濾操作符

  • filter:基于條件過濾元素
  • take:取前N個元素
  • skip:跳過前N個元素
  • distinct:去重
Flux<Integer> numbers = Flux.range(1, 10);
Flux<Integer> evens = numbers.filter(n -> n % 2 == 0);
Flux<Integer> first5 = numbers.take(5);

組合操作符

  • mergeWith:合并多個Flux,不保證順序
  • concatWith:順序連接Flux
  • zipWith:與另一個Flux組合
  • combineLatest:當任一Flux發(fā)出值時組合最新值
Flux<String> fluxA = Flux.just("A", "B", "C");
Flux<String> fluxB = Flux.just("1", "2", "3");
Flux<String> merged = fluxA.mergeWith(fluxB); // A, 1, B, 2, C, 3
Flux<String> concatenated = fluxA.concatWith(fluxB); // A, B, C, 1, 2, 3
Flux<String> zipped = fluxA.zipWith(fluxB, (a, b) -> a + b); // A1, B2, C3

錯誤處理操作符

  • onErrorReturn:出錯時返回默認值
  • onErrorResume:出錯時切換到備用的Flux
  • onErrorContinue:跳過錯誤元素并繼續(xù)處理
  • retry:重試操作
Flux<Integer> withErrorHandling = Flux.just(1, 2, 0, 4)
    .map(i -> 10 / i)
    .onErrorResume(e -> Flux.just(-1));
Flux<Integer> withContinue = Flux.just(1, 2, 0, 4)
    .map(i -> {
        try {
            return 10 / i;
        } catch (Exception e) {
            throw Exceptions.propagate(e);
        }
    })
    .onErrorContinue((e, o) -> System.out.println("Error for " + o + ": " + e.getMessage()));

背壓操作符

  • onBackpressureBuffer:緩沖元素
  • onBackpressureDrop:丟棄無法處理的元素
  • onBackpressureLatest:只保留最新元素
Flux.range(1, 1000)
    .onBackpressureBuffer(50) // 緩沖區(qū)大小為50
    .subscribe(new BaseSubscriber<Integer>() {
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            request(10); // 初始請求10個元素
        }
        @Override
        protected void hookOnNext(Integer value) {
            System.out.println("Received: " + value);
            // 根據(jù)需要請求更多元素
            if (value % 10 == 0) {
                request(10);
            }
        }
    });

時間相關操作符

  • delayElements:延遲發(fā)出每個元素
  • timeout:設置超時時間
  • sample:定期采樣
Flux.range(1, 5)
    .delayElements(Duration.ofMillis(500))
    .subscribe(System.out::println);

訂閱Flux

與Mono類似,F(xiàn)lux也是惰性的,需要訂閱才能執(zhí)行:

// 1. 簡單訂閱
flux.subscribe();
// 2. 帶消費者的訂閱
flux.subscribe(
    value -> System.out.println("Received: " + value), // onNext
    error -> System.err.println("Error: " + error),   // onError
    () -> System.out.println("Completed"),           // onComplete
    subscription -> subscription.request(3)          // 初始請求3個元素
);
// 3. 帶Subscription控制的訂閱
flux.subscribe(new BaseSubscriber<String>() {
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        System.out.println("Subscribed");
        request(1); // 請求第一個元素
    }
    @Override
    protected void hookOnNext(String value) {
        System.out.println("Received: " + value);
        // 處理完當前元素后請求下一個
        request(1);
    }
});

實際應用場景

  • 流式API:提供持續(xù)更新的數(shù)據(jù)流(如股票價格、傳感器數(shù)據(jù))
  • 批量數(shù)據(jù)處理:處理大量數(shù)據(jù)時避免內(nèi)存溢出
  • 事件處理:處理來自消息隊列或事件總線的事件流
  • 文件處理:逐行讀取大文件
@RestController
public class EventController {
    private final EventPublisher eventPublisher;
    public EventController(EventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }
    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Event> getEvents() {
        return eventPublisher
            .publish()
            .map(event -> {
                // 轉(zhuǎn)換或豐富事件數(shù)據(jù)
                return event;
            })
            .onBackpressureBuffer(100)
            .log();
    }
}

3.3.3 Schedulers(調(diào)度器)

在響應式編程中,Schedulers負責管理線程和執(zhí)行上下文。Project Reactor提供了多種預定義的Scheduler實現(xiàn),用于控制異步操作的執(zhí)行位置。

核心概念

  • 線程模型:響應式編程通常使用少量線程處理大量并發(fā)
  • 非阻塞I/O:避免線程阻塞,提高資源利用率
  • 執(zhí)行上下文:決定操作在哪個線程或線程池上執(zhí)行

預定義的Scheduler類型

Schedulers.immediate()

  • 在當前線程立即執(zhí)行
  • 通常用于測試或不需要異步的場景

Schedulers.single()

  • 使用單個可重用的線程
  • 適用于低延遲的輕量級任務
  • 所有調(diào)用者共享同一個線程

Schedulers.elastic()(已棄用,推薦使用boundedElastic):

  • 無限擴展的線程池
  • 適合阻塞I/O操作
  • 每個新任務可能創(chuàng)建新線程

Schedulers.boundedElastic()

  • 有界的彈性線程池
  • 默認最多創(chuàng)建10 * CPU核心數(shù)的線程
  • 適合阻塞I/O操作
  • 比elastic更可控,避免資源耗盡

Schedulers.parallel()

  • 固定大小的并行線程池
  • 默認大小等于CPU核心數(shù)
  • 適合計算密集型任務

Schedulers.fromExecutorService()

  • 從現(xiàn)有的ExecutorService創(chuàng)建
  • 允許與現(xiàn)有線程池集成

使用Scheduler

發(fā)布到Scheduler

  • publishOn:影響后續(xù)操作符的執(zhí)行上下文
  • subscribeOn:影響整個鏈的訂閱上下文(通常用在鏈的開頭)
Flux.range(1, 5)
    .map(i -> {
        System.out.println("Map on " + Thread.currentThread().getName());
        return i * 2;
    })
    .publishOn(Schedulers.boundedElastic())
    .filter(i -> {
        System.out.println("Filter on " + Thread.currentThread().getName());
        return i % 3 == 0;
    })
    .subscribeOn(Schedulers.parallel())
    .subscribe();

指定操作符的Scheduler
許多操作符接受可選的Scheduler參數(shù)

Flux.interval(Duration.ofMillis(100), Schedulers.single())
    .subscribe(System.out::println);

調(diào)度策略選擇指南

計算密集型操作

  • 使用parallel()調(diào)度器
  • 避免線程切換開銷
  • 線程數(shù)通常等于CPU核心數(shù)

阻塞I/O操作

  • 使用boundedElastic()調(diào)度器
  • 防止阻塞事件循環(huán)線程
  • 允許更多的線程處理并發(fā)I/O

非阻塞異步操作

  • 通常不需要顯式調(diào)度
  • 由底層異步庫管理線程

UI交互

  • 使用專用的UI線程調(diào)度器
  • 通過Schedulers.fromExecutor()與UI框架集成

最佳實踐

避免在響應式鏈中阻塞

  • 如果必須阻塞,使用publishOn切換到適當?shù)恼{(diào)度器

合理選擇調(diào)度器

  • 計算密集型:parallel
  • I/O密集型:boundedElastic
  • 事件循環(huán):通常不需要顯式調(diào)度

注意上下文傳播

  • Reactor Context可以攜帶跨線程的信息
  • 使用contextWritedeferContextual管理上下文

資源清理

對于自定義調(diào)度器或長時間運行的應用程序,注意關閉調(diào)度器

Scheduler scheduler = Schedulers.newBoundedElastic(10, 100, "custom");
try {
    Flux.just(1, 2, 3)
        .publishOn(scheduler)
        .subscribe(System.out::println);
} finally {
    scheduler.dispose();
}

實際應用示例

@RestController
public class DataController {
    private final DataService dataService;
    public DataController(DataService dataService) {
        this.dataService = dataService;
    }
    @GetMapping("/data/{id}")
    public Mono<Data> getData(@PathVariable String id) {
        // 使用boundedElastic處理潛在的阻塞操作
        return Mono.fromCallable(() -> dataService.blockingGetData(id))
            .subscribeOn(Schedulers.boundedElastic())
            .timeout(Duration.ofSeconds(2))
            .onErrorResume(e -> Mono.just(Data.fallbackData()));
    }
    @GetMapping("/stream")
    public Flux<Data> streamData() {
        // 使用parallel處理計算密集型流
        return dataService.dataStream()
            .publishOn(Schedulers.parallel())
            .map(data -> {
                // 計算密集型轉(zhuǎn)換
                return data.withProcessedPayload(processPayload(data.getPayload()));
            })
            .log();
    }
    private String processPayload(String payload) {
        // 模擬計算密集型處理
        return payload.toUpperCase();
    }
}

總結(jié)

Project Reactor為Java響應式編程提供了強大的工具集:

  • Mono:處理0-1個結(jié)果的異步操作,適合單個值或空結(jié)果的場景
  • Flux:處理0-N個結(jié)果的異步序列,適合流式數(shù)據(jù)處理
  • Schedulers:管理執(zhí)行上下文,優(yōu)化線程使用和資源分配

通過合理組合這些組件,開發(fā)者可以構(gòu)建高效、可擴展的響應式應用程序,充分利用現(xiàn)代硬件資源,同時保持代碼的清晰和可維護性。響應式編程模型特別適合高并發(fā)、低延遲的應用場景,如微服務架構(gòu)、實時數(shù)據(jù)處理和事件驅(qū)動系統(tǒng)。

到此這篇關于React編程模型:Project Reactor深度解析的文章就介紹到這了,更多相關React Project Reactor內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • svelte5中使用react組件的方法

    svelte5中使用react組件的方法

    本文介紹了如何在Svelte 5項目中導入并使用React組件庫,并提供了一個示例項目地址,此外,還添加了一個React組件庫(如Ant Design)的示例,感興趣的朋友跟隨小編一起看看吧
    2025-01-01
  • 一文詳解手動實現(xiàn)Recoil狀態(tài)管理基本原理

    一文詳解手動實現(xiàn)Recoil狀態(tài)管理基本原理

    這篇文章主要為大家介紹了一文詳解手動實現(xiàn)Recoil狀態(tài)管理基本原理實例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-05-05
  • React diff算法的實現(xiàn)示例

    React diff算法的實現(xiàn)示例

    這篇文章主要介紹了React diff算法的實現(xiàn)示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-04-04
  • React?state結(jié)構(gòu)設計原則示例詳解

    React?state結(jié)構(gòu)設計原則示例詳解

    這篇文章主要為大家介紹了React?state結(jié)構(gòu)設計原則示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-06-06
  • React實現(xiàn)一個拖拽排序組件的示例代碼

    React實現(xiàn)一個拖拽排序組件的示例代碼

    這篇文章主要給大家介紹了React實現(xiàn)一個拖拽排序組件?-?支持多行多列、支持TypeScript、支持Flip動畫、可自定義拖拽區(qū)域,文章通過代碼示例介紹的非常詳細,需要的朋友可以參考下
    2023-11-11
  • 面試官常問React的生命周期問題

    面試官常問React的生命周期問題

    在react面試中,面試官經(jīng)常會問我們一些關于react的生命周期問題,今天特此分享本文給大家詳細介紹下,感興趣的朋友跟隨小編一起看看吧
    2021-08-08
  • react 下拉框內(nèi)容回顯的實現(xiàn)思路

    react 下拉框內(nèi)容回顯的實現(xiàn)思路

    這篇文章主要介紹了react 下拉框內(nèi)容回顯,實現(xiàn)思路是通過將下拉框選項的value和label一起存儲到state中, 初始化表單數(shù)據(jù)時將faqType對應的label查找出來并設置到Form.Item中,最后修改useEffect,需要的朋友可以參考下
    2024-05-05
  • React團隊測試并發(fā)特性詳解

    React團隊測試并發(fā)特性詳解

    這篇文章主要為大家介紹了React團隊測試并發(fā)特性詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-08-08
  • react使用antd的上傳組件實現(xiàn)文件表單一起提交功能(完整代碼)

    react使用antd的上傳組件實現(xiàn)文件表單一起提交功能(完整代碼)

    最近在做一個后臺管理項目,涉及到react相關知識,項目需求需要在表單中帶附件提交,怎么實現(xiàn)這個功能呢?下面小編給大家?guī)砹藃eact使用antd的上傳組件實現(xiàn)文件表單一起提交功能,一起看看吧
    2021-06-06
  • 淺談箭頭函數(shù)寫法在ReactJs中的使用

    淺談箭頭函數(shù)寫法在ReactJs中的使用

    這篇文章主要介紹了淺談箭頭函數(shù)寫法在ReactJs中的使用,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-08-08

最新評論