Project?Reactor源碼解析publishOn使用示例
功能分析
相關(guān)示例源碼:github.com/chentianmin…
public final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch)
在onNext()
、onComplete()
、onError()
方法進(jìn)行線程切換,publishOn()
使得它下游的消費階段異步執(zhí)行。
- scheduler:線程切換的調(diào)度器,
Scheduler
用來生成實際執(zhí)行異步任務(wù)的Worker
。 - delayError:是否延時轉(zhuǎn)發(fā)
Error
。如果為true
,當(dāng)收到上游的Error
時,會等隊列中的元素消費完畢后再向下游轉(zhuǎn)發(fā)Error
。否則會立即轉(zhuǎn)發(fā)Error
,可能導(dǎo)致隊列中的元素丟失。默認(rèn)為true
。 - prefetch:預(yù)取元素的數(shù)量,同時也是隊列的容量。默認(rèn)值為
Queues.SMALL_BUFFER_SIZE
,該值通過配置進(jìn)行修改。
代碼示例
prefetch
/** * 每隔delayMillis生產(chǎn)一個元素 */ protected Flux<Integer> delayPublishFlux(int delayMillis, int startInclusive, int endExclusive) { return Flux.create(fluxSink -> { IntStream.range(startInclusive, endExclusive) .forEach(i -> { // 同步next sleep(delayMillis); logInt(i, "生產(chǎn)"); fluxSink.next(i); }); fluxSink.complete(); }); } @Test public void testPreFetch() { delayPublishFlux(1000, 1, 5) .doOnRequest(i -> logLong(i, "request")) .publishOn(Schedulers.boundedElastic(), 2) .subscribe(i -> logInt(i, "消費")); sleep(10000); }
每次會都向上游請求2個元素。另外還能發(fā)現(xiàn),從第二個request開始,線程發(fā)生了切換。
delayError
/** * 每隔delayMillis生產(chǎn)一個元素,最后發(fā)送Error */ protected Flux<Integer> delayPublishFluxError(int delayMillis, int startInclusive, int endExclusive) { return Flux.create(fluxSink -> { IntStream.range(startInclusive, endExclusive) .forEach(i -> { // 同步next sleep(delayMillis); logInt(i, "生產(chǎn)"); fluxSink.next(i); }); fluxSink.error(new RuntimeException("發(fā)布錯誤!")); }); } @Test public void testDelayError() { delayPublishFluxError(500, 1, 5) .publishOn(Schedulers.boundedElastic()) // 只是為了消費慢一點 .doOnNext(i -> sleep(1000)) .subscribe(i -> logInt(i, "消費")); sleep(10000); }
元素消費完才觸發(fā)Error
!
@Test public void testNotDelayError() { delayPublishFluxError(500, 1, 5) .publishOn(Schedulers.boundedElastic(), false, 256) // 只是為了消費慢一點 .doOnNext(i -> sleep(1000)) .subscribe(i -> logInt(i, "消費")); sleep(10000); }
元素還沒消費完就觸發(fā)Error
!
源碼分析
首先看一下publishOn()
操作符在裝配階段做了什么,直接查看Flux#publishOn()
源碼。
Flux#publishOn()
publishOn()
裝配階段重點是創(chuàng)建了FluxPublishOn
對象。
接下來,我們分析訂閱階段發(fā)生了什么。一個Publisher
在訂閱的時候調(diào)用的是其subscribe()
方法,因此我們繼續(xù)看Flux#subscribe()
源碼。
Flux#subscribe()
在Flux#subscribe()
方法的實現(xiàn)中,如果上游Publisher
是OptimizableOperator
類型,實際的Subscriber
是通過調(diào)用該InternalFluxOperator#subscribeOrReturn()
方法返回的。如果返回值為null
,直接return
。
對于publishOn()
操作符來說,裝配階段創(chuàng)建的FluxPublishOn
就是OptimizableOperator
類型。所以繼續(xù)查看FluxPublishOn#subscribeOrReturn()
源碼。
FluxPublishOn#subscribeOrReturn()
可以看到,方法返回的是PublishOnSubscriber
,它包裝了原始的Subscriber
。
在后續(xù)的訂閱階段一定會調(diào)用其onSubscribe()
方法,在運行階段一定會調(diào)用其onNext()
方法。我們先看FluxPublishOn#onSubscribe()
源碼。
FluxPublishOn#onSubscribe()
在onSubscribe()
實現(xiàn)中,分為同步隊列融合、異步隊列融合以及非融合方式處理。
如果上游的Subscription
是QueueSubscription
類型,則會進(jìn)行隊列融合。具體采用同步還是異步,取決于該QueueSubscription#requestFusion()
實現(xiàn)。
- 同步隊列融合:復(fù)用當(dāng)前隊列,繼續(xù)調(diào)用下游
onSubscribe()
方法,但不會繼續(xù)調(diào)用上游request()
方法。 - 異步隊列融合:復(fù)用當(dāng)前隊列,然后繼續(xù)調(diào)用下游
onSubscribe()
以及上游request()
方法,請求數(shù)量是prefetch
。 - 非融合:創(chuàng)建一個新的隊列,然后繼續(xù)調(diào)用下游
onSubscribe()
以及上游request()
方法,請求數(shù)量是prefetch
。
接下來,我們從源碼角度分別介紹上述三種方式的處理邏輯,首先介紹非融合方式。
非融合
先看如下代碼示例,該代碼會以非融合方式執(zhí)行。
@Test public void testNoFuse() { delayPublishFlux(1000, 1, 5) .publishOn(Schedulers.boundedElastic()) .subscribe(i -> logInt(i, "消費")); sleep(10000); }
間隔1s生產(chǎn)消費元素!
在消費階段,一定會調(diào)用FluxPublishOn#onNext()
方法。
FluxPublishOn#onNext()
我們重點關(guān)注非融合方式執(zhí)行邏輯,其實只做了2件事:
- 將下發(fā)的元素添加到隊列中,該隊列就是
onSubscribe()
階段創(chuàng)建的新隊列。 - 調(diào)用
trySchedule()
方法進(jìn)行調(diào)度。
繼續(xù)看FluxPublishOn#trySchedule()
源碼。
FluxPublishOn#trySchedule()
這里其實就是交由woker
異步執(zhí)行,后續(xù)會執(zhí)行FluxPublishOn.run()
方法。
FluxPublishOn#run()
在run()方法執(zhí)行的時候,分為3段邏輯:
- 如果是輸出融合,執(zhí)行
runBackfused()
方法。 - 如果是同步隊列融合,執(zhí)行
runSync()
方法。 - 否則,執(zhí)行
runAsync()
方法。
對于當(dāng)前例子,實際執(zhí)行的是runAsync()
方法,繼續(xù)查看其源碼。
FluxPublishOn#runAsync()
runAsync()
做的事情比較簡單,就是排空隊列中的元素下發(fā)給下游。同時在這里會繼續(xù)調(diào)用request()
向上游請求數(shù)據(jù),這也是前面說的從第二個request()
開始會進(jìn)行線程切換的原因。
另外這里還會調(diào)用checkTerminated()
,檢查終止情況。
FluxPublishOn#checkTerminated()
如果delayError=true
,必須當(dāng)前隊列為空是才會轉(zhuǎn)發(fā)Error
。如果delayError=false
,則直接轉(zhuǎn)發(fā)Error
。繼續(xù)查看onComplete()
方法。
FluxPublishOn#onComplete()
如果未結(jié)束,將done
標(biāo)記設(shè)置為true
,然后再次調(diào)用trySchedule()
進(jìn)行調(diào)度。后續(xù)再被調(diào)度到的時候,如果隊列已經(jīng)排空,才會調(diào)用下游onComplete()
,觸發(fā)完成。
小結(jié)
簡單總結(jié)一下非融合執(zhí)行過程:
在onSubscribe()
時創(chuàng)建一個隊列,在onNext()
時將上游下發(fā)的元素添加到隊列中,然后異步排空隊列中的元素,繼續(xù)下發(fā)給下游。
同步隊列融合
以下代碼會以同步隊列融合方式執(zhí)行。
@Test public void testSyncFuse() { Flux.just(1, 2 ,3, 4, 5) .publishOn(Schedulers.boundedElastic()) .subscribe(this::logInt); sleep(10000); }
因為Flux.just()
對應(yīng)的Subscription
是SynchronousSubscription
,其requestFusion()
方法實現(xiàn)如下:
SynchronousSubscription#requestFusion()
此時返回的是SYNC
,執(zhí)行同步隊列融合。
前面提到過,同步隊列融合會復(fù)用當(dāng)前隊列,繼續(xù)調(diào)用下游onSubscribe()
方法,但不會繼續(xù)調(diào)用上游request()
方法。
這意味著,此時FluxPublishOn#onNext()
和FluxPublishOn#onComplete()
方法并不會調(diào)用。但是FluxPublishOn#request()
依然會被下游調(diào)用到。
FluxPublishOn#request()
在request()
方法中還是會調(diào)用trySchedule()
,后續(xù)會異步調(diào)用runSync()
方法(前面已經(jīng)分析了)。
對于非融合方式,trySchedule()
也會執(zhí)行,只是這次調(diào)度的時候,隊列中還沒有數(shù)據(jù)被添加進(jìn)去。
FluxPublishOn#runSync()
runSync()
實現(xiàn)上runAsync()
差不多,也是排空隊列的元素,繼續(xù)下發(fā)給下游。不同的點是少了request()
調(diào)用,以及取消完成控制有差異。
小結(jié)
簡單總結(jié)一下同步隊列融合執(zhí)行過程:
在onSubsrribe()
時直接復(fù)用上游QueueSubscription
作為隊列,不會調(diào)用上游request()
請求數(shù)據(jù),在自身request()
時異步排空隊列中的元素,繼續(xù)下發(fā)給下游。
異步隊列融合
以下代碼會以異步隊列融合方式執(zhí)行。
@Test public void testAsyncFuse() { Flux.just(1, 2, 3, 4, 5) .windowUntil(i -> i % 3 == 0) .publishOn(Schedulers.boundedElastic()) .flatMap(Function.identity()) .subscribe(this::logInt); sleep(10000); }
因為windowUntil()
對應(yīng)的Subscription
是WindowPredicateMain
,其requestFusion()
方法實現(xiàn)如下:
WindowPredicateMain#requestFusion()
此時返回ASYNC
,執(zhí)行異步隊列融合。接下來再看一下FluxPublishOn#onNext()
源碼。
FluxPublishOn#onNext()
注意,此時onNext()
方法參數(shù)是null
,表明上游并沒有真正下發(fā)元素,可以將其看做是一個觸發(fā)Worker
調(diào)度的信號。后續(xù)還是會異步執(zhí)行runAsync()
方法,這里就不再分析了。
這其實也很容易理解:異步隊列融合直接復(fù)用了上游的QueueSubscription
作為隊列,真正的數(shù)據(jù)應(yīng)該由這個隊列下發(fā)。
總結(jié)
簡單總結(jié)一下同步隊列融合執(zhí)行過程:
在onSubsrribe()
時直接復(fù)用上游QueueSubscription
作為隊列,在onNext()
時接收上游信號,異步排空隊列中的元素,繼續(xù)下發(fā)給下游。
非融合、同步隊列融合、異步隊列融合比較如下:
以上就是Project Reactor源碼解析publishOn使用示例的詳細(xì)內(nèi)容,更多關(guān)于Project Reactor publishOn的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
springboot中使用FastJson解決long類型在js中失去精度的問題
這篇文章主要介紹了springboot中使用FastJson解決long類型在js中失去精度的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-06-06詳解在springmvc中解決FastJson循環(huán)引用的問題
本篇文章主要介紹了在springmvc中解決FastJson循環(huán)引用的問題,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-01-01SSH框架網(wǎng)上商城項目第21戰(zhàn)之詳解易寶支付的流程
這篇文章主要為大家詳細(xì)介紹了SSH框架網(wǎng)上商城項目第21戰(zhàn)之易寶支付的流程,感興趣的小伙伴們可以參考一下2016-06-06Java中進(jìn)程、協(xié)程與線程的區(qū)別詳解
這篇文章主要介紹了Java中進(jìn)程,線程,協(xié)程的概念、區(qū)別以及使用場景的選擇,早期的操作系統(tǒng)每個程序就是一個進(jìn)程,知道一個程序運行完,才能進(jìn)行下一個進(jìn)程,就是"單進(jìn)程時代",一切的程序只能串行發(fā)生,需要的朋友可以參考下2023-08-08springboot + devtools(熱部署)實例教程
devtools是boot的一個熱部署工具,當(dāng)我們修改了classpath下的文件(包括類文件、屬性文件、頁面等)時,會重新啟動應(yīng)用。本文通過實例給大家介紹springboot+devtools熱部署,感興趣的朋友一起看看吧2017-04-04一文教你如何通過三級緩存解決Spring循環(huán)依賴
這篇文章主要介紹了如何通過三級緩存解決?Spring?循環(huán)依賴,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考價值,需要的朋友可以參考下2023-07-07詳解Spring Cloud 跨服務(wù)數(shù)據(jù)聚合框架
這篇文章主要介紹了詳解Spring Cloud 跨服務(wù)數(shù)據(jù)聚合框架,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-03-03