欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

如何使用Reactor完成類似Flink的操作

 更新時(shí)間:2021年03月01日 09:27:07   作者:木小豐  
這篇文章主要介紹了如何使用Reactor完成類似Flink的操作,幫助大家更好的理解和學(xué)習(xí)使用Java,感興趣的朋友可以了解下

一、背景

Flink在處理流式任務(wù)的時(shí)候有很大的優(yōu)勢(shì),其中windows等操作符可以很方便的完成聚合任務(wù),但是Flink是一套獨(dú)立的服務(wù),業(yè)務(wù)流程中如果想使用需要將數(shù)據(jù)發(fā)到kafka,用Flink處理完再發(fā)到kafka,然后再做業(yè)務(wù)處理,流程很繁瑣。

比如在業(yè)務(wù)代碼中想要實(shí)現(xiàn)類似Flink的window按時(shí)間批量聚合功能,如果純手動(dòng)寫代碼比較繁瑣,使用Flink又太重,這種場(chǎng)景下使用響應(yīng)式編程RxJava、Reactor等的window、buffer操作符可以很方便的實(shí)現(xiàn)。

響應(yīng)式編程框架也早已有了背壓以及豐富的操作符支持,能不能用響應(yīng)式編程框架處理類似Flink的操作呢,答案是肯定的。

本文使用Reactor來實(shí)現(xiàn)Flink的window功能來舉例,其他操作符理論上相同。文中涉及的代碼:github

二、實(shí)現(xiàn)過程

Flink對(duì)流式處理做的很好的封裝,使用Flink的時(shí)候幾乎不用關(guān)心線程池、積壓、數(shù)據(jù)丟失等問題,但是使用Reactor實(shí)現(xiàn)類似的功能就必須對(duì)Reactor運(yùn)行原理比較了解,并且經(jīng)過不同場(chǎng)景下測(cè)試,否則很容易出問題。

下面列舉出實(shí)現(xiàn)過程中的核心點(diǎn):

1、創(chuàng)建Flux和發(fā)送數(shù)據(jù)分離

入門Reactor的時(shí)候給的示例都是創(chuàng)建Flux的時(shí)候同時(shí)就把數(shù)據(jù)賦值了,比如:Flux.just、Flux.range等,從3.4.0版本后先創(chuàng)建Flux,再發(fā)送數(shù)據(jù)可使用Sinks完成。有兩個(gè)比較容易混淆的方法:

  • Sinks.many().multicast() 如果沒有訂閱者,那么接收的消息直接丟棄
  • Sinks.many().unicast() 如果沒有訂閱者,那么保存接收的消息直到第一個(gè)訂閱者訂閱
  • Sinks.many().replay() 不管有多少訂閱者,都保存所有消息

在此示例場(chǎng)景中,選擇的是Sinks.many().unicast()

官方文檔:https://projectreactor.io/docs/core/release/reference/#processors

2、背壓支持

上面方法的對(duì)象背壓策略支持兩種:BackpressureBuffer、BackpressureError,在此場(chǎng)景肯定是選擇BackpressureBuffer,需要指定緩存隊(duì)列,初始化方法如下:Queues.get(queueSize).get()

數(shù)據(jù)提交有兩個(gè)方法:

  • emitNext 指定提交失敗策略同步提交
  • tryEmitNext 異步提交,返回提交成功、失敗狀態(tài)

在此場(chǎng)景我們不希望丟數(shù)據(jù),可自定義失敗策略,提交失敗無限重試,當(dāng)然也可以調(diào)用異步方法自己重試。

 Sinks.EmitFailureHandler ALWAYS_RETRY_HANDLER = (signalType, emitResult) -> emitResult.isFailure();

在此之后就就可以調(diào)用Sinks.asFlux開心的使用各種操作符了。

在此之后就就可以調(diào)用Sinks.asFlux開心的使用各種操作符了。

3、窗口函數(shù)

Reactor支持兩類窗口聚合函數(shù):

  • window類:返回Mono(Flux)
  • buffer類:返回List

在此場(chǎng)景中,使用buffer即可滿足需求,bufferTimeout(int maxSize, Duration maxTime)支持最大個(gè)數(shù),最大等待時(shí)間操作,F(xiàn)link中的keys操作可以用groupBy、collectMap來實(shí)現(xiàn)。

4、消費(fèi)者處理

Reactor經(jīng)過buffer后是一個(gè)一個(gè)的發(fā)送數(shù)據(jù),如果使用publishOn或subscribeOn處理的話,只等待下游的subscribe處理完成才會(huì)重新request新的數(shù)據(jù),buffer操作符才會(huì)重新發(fā)送數(shù)據(jù)。如果此時(shí)subscribe消費(fèi)者耗時(shí)較長,數(shù)據(jù)流會(huì)在buffer流程阻塞,顯然并不是我們想要的。

理想的操作是消費(fèi)者在一個(gè)線程池里操作,可多線程并行處理,如果線程池滿,再阻塞buffer操作符。解決方案是自定義一個(gè)線程池,并且當(dāng)然線程池如果任務(wù)滿submit支持阻塞,可以用自定義RejectedExecutionHandler來實(shí)現(xiàn):

 RejectedExecutionHandler executionHandler = (r, executor) -> {
   try {
     executor.getQueue().put(r);
   } catch (InterruptedException e) {
     Thread.currentThread().interrupt();
     throw new RejectedExecutionException("Producer thread interrupted", e);
   }
 };
 
 new ThreadPoolExecutor(poolSize, poolSize,
     0L, TimeUnit.MILLISECONDS,
     new SynchronousQueue<>(),
     executionHandler);

三、總結(jié)

1、總結(jié)一下整體的執(zhí)行流程

提交任務(wù):提交數(shù)據(jù)支持同步異步兩種方式,支持多線程提交,正常情況下響應(yīng)很快,同步的方法如果隊(duì)列滿則阻塞。
豐富的操作符處理流式數(shù)據(jù)。
buffer操作符產(chǎn)生的數(shù)據(jù)多線程處理:同步提交到單獨(dú)的消費(fèi)者線程池,線程池任務(wù)滿則阻塞。
消費(fèi)者線程池:支持阻塞提交,保證不丟消息,同時(shí)隊(duì)列長度設(shè)置成0,因?yàn)榍懊嬉呀?jīng)有隊(duì)列了。
背壓:消費(fèi)者線程池阻塞后,會(huì)背壓到buffer操作符,并背壓到緩沖隊(duì)列,緩存隊(duì)列滿背壓到數(shù)據(jù)提交者。

2、和Flink的對(duì)比

實(shí)現(xiàn)的Flink的功能:

  • 不輸Flink的豐富操作符
  • 支持背壓,不丟數(shù)據(jù)

優(yōu)勢(shì):

  • 輕量級(jí),可直接在業(yè)務(wù)代碼中使用

劣勢(shì):

  • 內(nèi)部執(zhí)行流程復(fù)雜,容易踩坑,不如Flink傻瓜化
  • 沒有watermark功能,也就意味著只支持無序數(shù)據(jù)處理
  • 沒有savepoint功能,雖然我們用背壓解決了部分問題,但是宕機(jī)后開始會(huì)丟失緩存隊(duì)列和消費(fèi)者線程池里的數(shù)據(jù),補(bǔ)救措施是添加Java Hook功能
  • 只支持單機(jī),意味著你的緩存隊(duì)列不能設(shè)置無限大,要考慮線程池的大小,且沒有flink globalWindow等功能
  • 需考慮對(duì)上游數(shù)據(jù)源的影響,F(xiàn)link的上游一般是mq,數(shù)據(jù)量大時(shí)可自動(dòng)堆積,如果本文的方案上游是http、rpc調(diào)用,產(chǎn)生的阻塞影響就不能忽略。補(bǔ)償方案是每次提交數(shù)據(jù)都使用異步方法,如果失敗則提交到mq中緩沖并消費(fèi)該mq無限重試。

四、附錄

本文源碼地址:https://github.com/sofn/reactor-window-like-flink

Reactor官方文檔:https://projectreactor.io/docs/core/release/reference/

Flink文檔:https://ci.apache.org/projects/flink/flink-docs-stable/

Reactive操作符:http://reactivex.io/documentation/operators.html

以上就是如何使用Reactor完成類似Flink的操作的詳細(xì)內(nèi)容,更多關(guān)于使用Reactor完成類似Flink的操作的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論