欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Project?Reactor源碼解析publishOn使用示例

 更新時(shí)間:2022年08月15日 17:05:15   作者:夜盡天明_  
這篇文章主要為大家介紹了Project?Reactor源碼解析publishOn使用示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

功能分析

相關(guān)示例源碼:github.com/chentianmin…

public final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch)

onNext()onComplete()、onError()方法進(jìn)行線程切換,publishOn()使得它下游的消費(fèi)階段異步執(zhí)行。

  • scheduler:線程切換的調(diào)度器,Scheduler用來(lái)生成實(shí)際執(zhí)行異步任務(wù)的Worker。
  • delayError:是否延時(shí)轉(zhuǎn)發(fā)Error。如果為true,當(dāng)收到上游的Error時(shí),會(huì)等隊(duì)列中的元素消費(fèi)完畢后再向下游轉(zhuǎn)發(fā)Error。否則會(huì)立即轉(zhuǎn)發(fā)Error,可能導(dǎo)致隊(duì)列中的元素丟失。默認(rèn)為true。
  • prefetch:預(yù)取元素的數(shù)量,同時(shí)也是隊(duì)列的容量。默認(rèn)值為Queues.SMALL_BUFFER_SIZE,該值通過(guò)配置進(jìn)行修改。

代碼示例

prefetch

/**
 * 每隔delayMillis生產(chǎn)一個(gè)元素
 */
protected Flux<Integer> delayPublishFlux(int delayMillis, int startInclusive, int endExclusive) {
    return Flux.create(fluxSink -> {
        IntStream.range(startInclusive, endExclusive)
                .forEach(i -> {
                    // 同步next
                    sleep(delayMillis);
                    logInt(i, "生產(chǎn)");
                    fluxSink.next(i);
                });
        fluxSink.complete();
    });
}
@Test
public void testPreFetch() {
    delayPublishFlux(1000, 1, 5)
            .doOnRequest(i -> logLong(i, "request"))
            .publishOn(Schedulers.boundedElastic(), 2)
            .subscribe(i -> logInt(i, "消費(fèi)"));
    sleep(10000);
}

每次會(huì)都向上游請(qǐng)求2個(gè)元素。另外還能發(fā)現(xiàn),從第二個(gè)request開(kāi)始,線程發(fā)生了切換。

delayError

/**
 * 每隔delayMillis生產(chǎn)一個(gè)元素,最后發(fā)送Error
 */
protected Flux<Integer> delayPublishFluxError(int delayMillis, int startInclusive, int endExclusive) {
    return Flux.create(fluxSink -> {
        IntStream.range(startInclusive, endExclusive)
                .forEach(i -> {
                    // 同步next
                    sleep(delayMillis);
                    logInt(i, "生產(chǎn)");
                    fluxSink.next(i);
                });
        fluxSink.error(new RuntimeException("發(fā)布錯(cuò)誤!"));
    });
}
@Test
public void testDelayError() {
    delayPublishFluxError(500, 1, 5)
            .publishOn(Schedulers.boundedElastic())
            // 只是為了消費(fèi)慢一點(diǎn)
            .doOnNext(i -> sleep(1000))
            .subscribe(i -> logInt(i, "消費(fèi)"));
    sleep(10000);
}

元素消費(fèi)完才觸發(fā)Error!

@Test
public void testNotDelayError() {
    delayPublishFluxError(500, 1, 5)
            .publishOn(Schedulers.boundedElastic(), false, 256)
            // 只是為了消費(fèi)慢一點(diǎn)
            .doOnNext(i -> sleep(1000))
            .subscribe(i -> logInt(i, "消費(fèi)"));
    sleep(10000);
}

元素還沒(méi)消費(fèi)完就觸發(fā)Error!

源碼分析

首先看一下publishOn()操作符在裝配階段做了什么,直接查看Flux#publishOn()源碼。

Flux#publishOn()

publishOn()裝配階段重點(diǎn)是創(chuàng)建了FluxPublishOn對(duì)象。

接下來(lái),我們分析訂閱階段發(fā)生了什么。一個(gè)Publisher在訂閱的時(shí)候調(diào)用的是其subscribe()方法,因此我們繼續(xù)看Flux#subscribe()源碼。

Flux#subscribe()

Flux#subscribe()方法的實(shí)現(xiàn)中,如果上游PublisherOptimizableOperator類(lèi)型,實(shí)際的Subscriber是通過(guò)調(diào)用該InternalFluxOperator#subscribeOrReturn()方法返回的。如果返回值為null,直接return

對(duì)于publishOn()操作符來(lái)說(shuō),裝配階段創(chuàng)建的FluxPublishOn就是OptimizableOperator類(lèi)型。所以繼續(xù)查看FluxPublishOn#subscribeOrReturn()源碼。

FluxPublishOn#subscribeOrReturn()

可以看到,方法返回的是PublishOnSubscriber,它包裝了原始的Subscriber。

在后續(xù)的訂閱階段一定會(huì)調(diào)用其onSubscribe()方法,在運(yùn)行階段一定會(huì)調(diào)用其onNext()方法。我們先看FluxPublishOn#onSubscribe()源碼。

FluxPublishOn#onSubscribe()

onSubscribe()實(shí)現(xiàn)中,分為同步隊(duì)列融合、異步隊(duì)列融合以及非融合方式處理。

如果上游的SubscriptionQueueSubscription類(lèi)型,則會(huì)進(jìn)行隊(duì)列融合。具體采用同步還是異步,取決于該QueueSubscription#requestFusion()實(shí)現(xiàn)。

  • 同步隊(duì)列融合:復(fù)用當(dāng)前隊(duì)列,繼續(xù)調(diào)用下游onSubscribe()方法,但不會(huì)繼續(xù)調(diào)用上游request()方法。
  • 異步隊(duì)列融合:復(fù)用當(dāng)前隊(duì)列,然后繼續(xù)調(diào)用下游onSubscribe()以及上游request()方法,請(qǐng)求數(shù)量是prefetch。
  • 非融合:創(chuàng)建一個(gè)新的隊(duì)列,然后繼續(xù)調(diào)用下游onSubscribe()以及上游request()方法,請(qǐng)求數(shù)量是prefetch。

接下來(lái),我們從源碼角度分別介紹上述三種方式的處理邏輯,首先介紹非融合方式。

非融合

先看如下代碼示例,該代碼會(huì)以非融合方式執(zhí)行。

@Test
public void testNoFuse() {
    delayPublishFlux(1000, 1, 5)
            .publishOn(Schedulers.boundedElastic())
            .subscribe(i -> logInt(i, "消費(fèi)"));
    sleep(10000);
}

間隔1s生產(chǎn)消費(fèi)元素!

在消費(fèi)階段,一定會(huì)調(diào)用FluxPublishOn#onNext()方法。

FluxPublishOn#onNext()

我們重點(diǎn)關(guān)注非融合方式執(zhí)行邏輯,其實(shí)只做了2件事:

  • 將下發(fā)的元素添加到隊(duì)列中,該隊(duì)列就是onSubscribe()階段創(chuàng)建的新隊(duì)列。
  • 調(diào)用trySchedule()方法進(jìn)行調(diào)度。

繼續(xù)看FluxPublishOn#trySchedule()源碼。

FluxPublishOn#trySchedule()

這里其實(shí)就是交由woker異步執(zhí)行,后續(xù)會(huì)執(zhí)行FluxPublishOn.run()方法。

FluxPublishOn#run()

在run()方法執(zhí)行的時(shí)候,分為3段邏輯:

  • 如果是輸出融合,執(zhí)行runBackfused()方法。
  • 如果是同步隊(duì)列融合,執(zhí)行runSync()方法。
  • 否則,執(zhí)行runAsync()方法。

對(duì)于當(dāng)前例子,實(shí)際執(zhí)行的是runAsync()方法,繼續(xù)查看其源碼。

FluxPublishOn#runAsync()

runAsync()做的事情比較簡(jiǎn)單,就是排空隊(duì)列中的元素下發(fā)給下游。同時(shí)在這里會(huì)繼續(xù)調(diào)用request()向上游請(qǐng)求數(shù)據(jù),這也是前面說(shuō)的從第二個(gè)request()開(kāi)始會(huì)進(jìn)行線程切換的原因。

另外這里還會(huì)調(diào)用checkTerminated(),檢查終止情況。

FluxPublishOn#checkTerminated()

如果delayError=true,必須當(dāng)前隊(duì)列為空是才會(huì)轉(zhuǎn)發(fā)Error。如果delayError=false,則直接轉(zhuǎn)發(fā)Error。繼續(xù)查看onComplete()方法。

FluxPublishOn#onComplete()

如果未結(jié)束,將done標(biāo)記設(shè)置為true,然后再次調(diào)用trySchedule()進(jìn)行調(diào)度。后續(xù)再被調(diào)度到的時(shí)候,如果隊(duì)列已經(jīng)排空,才會(huì)調(diào)用下游onComplete(),觸發(fā)完成。

小結(jié)

簡(jiǎn)單總結(jié)一下非融合執(zhí)行過(guò)程:

onSubscribe()時(shí)創(chuàng)建一個(gè)隊(duì)列,在onNext()時(shí)將上游下發(fā)的元素添加到隊(duì)列中,然后異步排空隊(duì)列中的元素,繼續(xù)下發(fā)給下游。

同步隊(duì)列融合

以下代碼會(huì)以同步隊(duì)列融合方式執(zhí)行。

@Test
public void testSyncFuse() {
    Flux.just(1, 2 ,3, 4, 5)
            .publishOn(Schedulers.boundedElastic())
            .subscribe(this::logInt);
    sleep(10000);
}

因?yàn)?code>Flux.just()對(duì)應(yīng)的SubscriptionSynchronousSubscription,其requestFusion()方法實(shí)現(xiàn)如下:

SynchronousSubscription#requestFusion()

此時(shí)返回的是SYNC,執(zhí)行同步隊(duì)列融合。

前面提到過(guò),同步隊(duì)列融合會(huì)復(fù)用當(dāng)前隊(duì)列,繼續(xù)調(diào)用下游onSubscribe()方法,但不會(huì)繼續(xù)調(diào)用上游request()方法。

這意味著,此時(shí)FluxPublishOn#onNext()FluxPublishOn#onComplete()方法并不會(huì)調(diào)用。但是FluxPublishOn#request()依然會(huì)被下游調(diào)用到。

FluxPublishOn#request()

request()方法中還是會(huì)調(diào)用trySchedule(),后續(xù)會(huì)異步調(diào)用runSync()方法(前面已經(jīng)分析了)。

對(duì)于非融合方式,trySchedule()也會(huì)執(zhí)行,只是這次調(diào)度的時(shí)候,隊(duì)列中還沒(méi)有數(shù)據(jù)被添加進(jìn)去。

FluxPublishOn#runSync()

runSync()實(shí)現(xiàn)上runAsync()差不多,也是排空隊(duì)列的元素,繼續(xù)下發(fā)給下游。不同的點(diǎn)是少了request()調(diào)用,以及取消完成控制有差異。

小結(jié)

簡(jiǎn)單總結(jié)一下同步隊(duì)列融合執(zhí)行過(guò)程:

onSubsrribe()時(shí)直接復(fù)用上游QueueSubscription作為隊(duì)列,不會(huì)調(diào)用上游request()請(qǐng)求數(shù)據(jù),在自身request()時(shí)異步排空隊(duì)列中的元素,繼續(xù)下發(fā)給下游。

異步隊(duì)列融合

以下代碼會(huì)以異步隊(duì)列融合方式執(zhí)行。

@Test
public void testAsyncFuse() {
    Flux.just(1, 2, 3, 4, 5)
            .windowUntil(i -&gt; i % 3 == 0)
            .publishOn(Schedulers.boundedElastic())
            .flatMap(Function.identity())
            .subscribe(this::logInt);
    sleep(10000);
}

因?yàn)?code>windowUntil()對(duì)應(yīng)的SubscriptionWindowPredicateMain,其requestFusion()方法實(shí)現(xiàn)如下:

WindowPredicateMain#requestFusion()

此時(shí)返回ASYNC,執(zhí)行異步隊(duì)列融合。接下來(lái)再看一下FluxPublishOn#onNext()源碼。

FluxPublishOn#onNext()

注意,此時(shí)onNext()方法參數(shù)是null,表明上游并沒(méi)有真正下發(fā)元素,可以將其看做是一個(gè)觸發(fā)Worker調(diào)度的信號(hào)。后續(xù)還是會(huì)異步執(zhí)行runAsync()方法,這里就不再分析了。

這其實(shí)也很容易理解:異步隊(duì)列融合直接復(fù)用了上游的QueueSubscription作為隊(duì)列,真正的數(shù)據(jù)應(yīng)該由這個(gè)隊(duì)列下發(fā)。

總結(jié)

簡(jiǎn)單總結(jié)一下同步隊(duì)列融合執(zhí)行過(guò)程:

onSubsrribe()時(shí)直接復(fù)用上游QueueSubscription作為隊(duì)列,在onNext()時(shí)接收上游信號(hào),異步排空隊(duì)列中的元素,繼續(xù)下發(fā)給下游。

非融合、同步隊(duì)列融合、異步隊(duì)列融合比較如下:

以上就是Project Reactor源碼解析publishOn使用示例的詳細(xì)內(nèi)容,更多關(guān)于Project Reactor publishOn的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • springboot中使用FastJson解決long類(lèi)型在js中失去精度的問(wèn)題

    springboot中使用FastJson解決long類(lèi)型在js中失去精度的問(wèn)題

    這篇文章主要介紹了springboot中使用FastJson解決long類(lèi)型在js中失去精度的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-06-06
  • CyclicBarrier線程同步共享變量底層原理示例解析

    CyclicBarrier線程同步共享變量底層原理示例解析

    這篇文章主要為大家介紹了CyclicBarrier線程同步共享變量底層原理示例解析詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-07-07
  • Java中的IO流之字符流Reader和Writer

    Java中的IO流之字符流Reader和Writer

    這篇文章主要介紹了Java中的IO流之字符流Reader和Writer,Reader : 和InputStream的唯一的區(qū)別就在于讀的數(shù)據(jù)單位不同,繼承自Reader的流都是用于向程序中輸入數(shù)據(jù),且數(shù)據(jù)的單位為字符16bit,需要的朋友可以參考下
    2023-10-10
  • Java多線程模式之Balking模式詳解

    Java多線程模式之Balking模式詳解

    這篇文章主要介紹了Java多線程模式之Balking模式,結(jié)合實(shí)例形式較為詳細(xì)的分析了Balking模式的原理、用法與相關(guān)注意事項(xiàng),需要的朋友可以參考下
    2017-06-06
  • 詳解在springmvc中解決FastJson循環(huán)引用的問(wèn)題

    詳解在springmvc中解決FastJson循環(huán)引用的問(wèn)題

    本篇文章主要介紹了在springmvc中解決FastJson循環(huán)引用的問(wèn)題,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-01-01
  • SSH框架網(wǎng)上商城項(xiàng)目第21戰(zhàn)之詳解易寶支付的流程

    SSH框架網(wǎng)上商城項(xiàng)目第21戰(zhàn)之詳解易寶支付的流程

    這篇文章主要為大家詳細(xì)介紹了SSH框架網(wǎng)上商城項(xiàng)目第21戰(zhàn)之易寶支付的流程,感興趣的小伙伴們可以參考一下
    2016-06-06
  • Java中進(jìn)程、協(xié)程與線程的區(qū)別詳解

    Java中進(jìn)程、協(xié)程與線程的區(qū)別詳解

    這篇文章主要介紹了Java中進(jìn)程,線程,協(xié)程的概念、區(qū)別以及使用場(chǎng)景的選擇,早期的操作系統(tǒng)每個(gè)程序就是一個(gè)進(jìn)程,知道一個(gè)程序運(yùn)行完,才能進(jìn)行下一個(gè)進(jìn)程,就是"單進(jìn)程時(shí)代",一切的程序只能串行發(fā)生,需要的朋友可以參考下
    2023-08-08
  • springboot + devtools(熱部署)實(shí)例教程

    springboot + devtools(熱部署)實(shí)例教程

    devtools是boot的一個(gè)熱部署工具,當(dāng)我們修改了classpath下的文件(包括類(lèi)文件、屬性文件、頁(yè)面等)時(shí),會(huì)重新啟動(dòng)應(yīng)用。本文通過(guò)實(shí)例給大家介紹springboot+devtools熱部署,感興趣的朋友一起看看吧
    2017-04-04
  • 一文教你如何通過(guò)三級(jí)緩存解決Spring循環(huán)依賴(lài)

    一文教你如何通過(guò)三級(jí)緩存解決Spring循環(huán)依賴(lài)

    這篇文章主要介紹了如何通過(guò)三級(jí)緩存解決?Spring?循環(huán)依賴(lài),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考價(jià)值,需要的朋友可以參考下
    2023-07-07
  • 詳解Spring Cloud 跨服務(wù)數(shù)據(jù)聚合框架

    詳解Spring Cloud 跨服務(wù)數(shù)據(jù)聚合框架

    這篇文章主要介紹了詳解Spring Cloud 跨服務(wù)數(shù)據(jù)聚合框架,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-03-03

最新評(píng)論