Flink時(shí)間和窗口邏輯處理源碼分析
概覽
- 計(jì)算模型
- DataStream基礎(chǔ)框架
- 事件時(shí)間和窗口
- 部署&調(diào)度
- 存儲(chǔ)體系
- 底層支撐
在實(shí)時(shí)計(jì)算處理時(shí),需要跟時(shí)間來(lái)打交道,如實(shí)時(shí)風(fēng)控場(chǎng)景的時(shí)間行為序列,實(shí)時(shí)分析場(chǎng)景下的時(shí)間窗口統(tǒng)計(jì)等。而由于網(wǎng)絡(luò)等問(wèn)題,會(huì)導(dǎo)致處理時(shí)的數(shù)據(jù)存在亂序問(wèn)題,F(xiàn)link通過(guò)吸收Google Dataflow/Bean的編程模型思想,提供了靈活的處理方式,本篇來(lái)分析下Flink中具體提供的功能和底層機(jī)制。
時(shí)間
Flink中提供了3種時(shí)間類(lèi)型來(lái)滿(mǎn)足不同場(chǎng)景的需求,即處理時(shí)間、事件時(shí)間和接入時(shí)間
- 處理時(shí)間(Processing time):數(shù)據(jù)在流式系統(tǒng)中處理時(shí)的機(jī)器系統(tǒng)時(shí)間
- 事件時(shí)間(Event time):每條單獨(dú)的事件在產(chǎn)出設(shè)備上發(fā)生的時(shí)間,即事件實(shí)際發(fā)生的時(shí)間。這個(gè)時(shí)間保存在發(fā)送給Flink系統(tǒng)的數(shù)據(jù)記錄中
- 接入時(shí)間(Ingestion time):Flink讀取事件時(shí)的時(shí)間 下圖是Flink官方文檔中3個(gè)時(shí)間的標(biāo)識(shí)
而在使用事件時(shí)間的場(chǎng)景下,需要一種方式來(lái)度量目前處理的事件時(shí)間,如使用事件時(shí)間窗口時(shí),需要知道什么時(shí)候來(lái)關(guān)閉這個(gè)窗口,所以這里引入了Watermark的機(jī)制。 這里先介紹Watermark關(guān)聯(lián)的3個(gè)概念
- WatermarkStrategy:org.apache.flink.table.sources.wmstrategies.WatermarkStrategy,定義怎么在DataStream中去生成Watermark的策略,其子類(lèi)有定義了多種不同的策略
- WatermarkGenerator:具體生成Watermark的生成器類(lèi)
- TimestampAssigner:從數(shù)據(jù)記錄中提取時(shí)間戳
重要類(lèi)
WatermarkStrategy
Flink中提供了一些常用的watermark策略,主要我們看看PeriodicWatermarkAssigner這個(gè)策略,周期性水位策略,其有2個(gè)子類(lèi)
- BoundedOutOfOrderTimestamps:沒(méi)有順序的數(shù)據(jù),指定對(duì)應(yīng)的延遲來(lái)產(chǎn)生watermark,產(chǎn)生的watermark為獲取的數(shù)據(jù)中的最大時(shí)間-指定的delay
- AscendingTimestamps:對(duì)于有順序的數(shù)據(jù)使用,產(chǎn)生的watermark為獲取的數(shù)據(jù)中最大的時(shí)間-1
WatermarkGenerator
WatermarkGenerator接口有2個(gè)方法
@Public public interface WatermarkGenerator<T> { /** * 每來(lái)一條事件數(shù)據(jù)調(diào)用一次,可以檢查或者記錄事件的時(shí)間戳,或者也可以基于事件數(shù)據(jù)本身去生成 watermark。 */ void onEvent(T event, long eventTimestamp, WatermarkOutput output); /** * 周期性的調(diào)用,也許會(huì)生成新的 watermark,也許不會(huì)。 * * <p>調(diào)用此方法生成 watermark 的間隔時(shí)間由 {@link ExecutionConfig#getAutoWatermarkInterval()} 決定。 */ void onPeriodicEmit(WatermarkOutput output); }
watermark生成的方式分為2種:周期性生成和標(biāo)記生成 周期性生成的通過(guò)onEvent()方法來(lái)更新最大時(shí)間戳,而在框架調(diào)用onPeriodicEmit()時(shí)發(fā)出watermark 標(biāo)記生成通過(guò)onEvent()來(lái)處理,如果有滿(mǎn)足條件的記錄出現(xiàn),就發(fā)出watermark
TimerService
如何獲取當(dāng)前的處理時(shí)間和watermark呢,這個(gè)在Flink中通過(guò)TimerService來(lái)負(fù)責(zé),下面先看看這個(gè)接口的相關(guān)方法
//返回當(dāng)前處理時(shí)間 /** Returns the current processing time. */ long currentProcessingTime(); //返回當(dāng)前事件時(shí)間watermark /** Returns the current event-time watermark. */ long currentWatermark(); //注冊(cè)一個(gè)timer,當(dāng)事件時(shí)間水位超過(guò)給定時(shí)間時(shí)觸發(fā) void registerEventTimeTimer(long time);
上面介紹了時(shí)間和watermark相關(guān)的重要類(lèi),下面通過(guò)一個(gè)例子把這些串聯(lián)起來(lái),看其是如何來(lái)運(yùn)轉(zhuǎn)的
處理邏輯
我們以Flink官方文檔中的watermark代碼例子結(jié)合來(lái)介紹
WatermarkStrategy<Event> strategy = WatermarkStrategy .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20)) .withTimestampAssigner((event, timestamp) -> event.timestamp); DataStream<Event> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(strategy);
這里通過(guò)WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))生成了一個(gè)延遲為20秒的有界限的watermark策略,然后指定了TimestampAssigner為時(shí)間戳為事件的timestamp字段。 stream.assignTimestampsAndWatermarks方法返回的是一個(gè)DataStream,通過(guò)第一篇的介紹,這里對(duì)應(yīng)有一個(gè)Transformation(TimestampsAndWatermarksTransformation),同時(shí)也對(duì)應(yīng)有一個(gè)StreamOperator(TimestampsAndWatermarksOperator, 注意這個(gè)是在Translator(TimestampsAndWatermarksTransformationTranslator)中定義的),我們看看具體的watermark在TimestampsAndWatermarksOperator中的處理邏輯如何
public void open() throws Exception { super.open(); timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup); // 創(chuàng)建watermarkGenerator watermarkGenerator = emitProgressiveWatermarks ? watermarkStrategy.createWatermarkGenerator(this::getMetricGroup) : new NoWatermarksGenerator<>(); wmOutput = new WatermarkEmitter(output); //獲取周期性watermark的調(diào)度周期 watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); if (watermarkInterval > 0 && emitProgressiveWatermarks) { final long now = getProcessingTimeService().getCurrentProcessingTime(); // 獲取timerService 注冊(cè)Timer getProcessingTimeService().registerTimer(now + watermarkInterval, this); } }
在StreamOperator的前處理方法中,創(chuàng)建了WatermarkGenerator,然后獲取watermark觸發(fā)周期,注冊(cè)到TimerService里面 后續(xù)再StreamOperator的每條數(shù)據(jù)處理方法中(processElement)調(diào)用了
watermarkGenerator.onEvent(event, newTimestamp, wmOutput);
這里實(shí)際會(huì)更新最大事件時(shí)間戳 而前面注冊(cè)Timer時(shí)會(huì)傳入一個(gè)ProcessingTimeCallback對(duì)象,該接口有個(gè)onProcessingTime方法,而TimestampsAndWatermarksOperator實(shí)現(xiàn)了該接口
// ProcessingTimeCallback.java ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target);
//TimestampsAndWatermarksOperator.java @Override public void onProcessingTime(long timestamp) throws Exception { // 發(fā)送watermark watermarkGenerator.onPeriodicEmit(wmOutput); // 更新下次觸發(fā)時(shí)間 final long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); }
這里通過(guò)回調(diào),觸發(fā)發(fā)送watermark和再次注冊(cè)下一個(gè)調(diào)度時(shí)間點(diǎn),而下游算子收到了watermark如何處理呢,如在window算子里面,回去更新算子里面TimerService的currentWatermark,這樣如果新數(shù)據(jù)小于當(dāng)前watermark那就會(huì)丟掉或按siteOutput處理,具體我們?cè)俜治龃翱跁r(shí)再介紹。
窗口
在實(shí)際場(chǎng)景中有很多對(duì)一段時(shí)間的數(shù)據(jù)來(lái)進(jìn)行處理的需求,F(xiàn)link中提供了不同種類(lèi)的窗口來(lái)支持
具體的類(lèi)型有
- 滾動(dòng)窗口:按固定的區(qū)間劃分,各個(gè)之間不重疊,如近1分鐘的頁(yè)面訪問(wèn)量
- 滑動(dòng)窗口:按固定區(qū)間劃分,但窗口間會(huì)存在重疊,如每10秒計(jì)算近1分鐘的頁(yè)面訪問(wèn)量
- 會(huì)話窗口:超過(guò)一段時(shí)間該窗口沒(méi)有數(shù)據(jù)則視為該窗口結(jié)束
重要類(lèi)
Window
定義了窗口的類(lèi)型,目前有2個(gè)子類(lèi)TimeWindow和GlobalWindow。TimeWindow指一個(gè)時(shí)間區(qū)間的,指定了開(kāi)始時(shí)間(含)和結(jié)束時(shí)間(不含); GlobalWindow指一個(gè)單獨(dú)的窗口,包括所有的數(shù)據(jù)
WindowAssigner
分配哪些窗口給輸入的元素,按照不同的窗口類(lèi)型和時(shí)間類(lèi)型有不同的分配方式的子類(lèi)。
- SlidingProcessingTimeWindows
- SlidingEventTimeWindows
- TumblingEventTimeWindows
- TumblingProcessingTimeWindows
- GlobalWindows 另外在session Window場(chǎng)景會(huì)涉及到window的合并,這里有一類(lèi)單獨(dú)的MergingWindowAssigner類(lèi)來(lái)實(shí)現(xiàn)
Triger
用于確定每片窗口什么時(shí)候進(jìn)行計(jì)算或清理,如有按時(shí)間、數(shù)量等方式。Triger后有如下幾種結(jié)果(定義在TriggerResult中)
//不做任何操作 CONTINUE(false, false), /** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */ //執(zhí)行窗口函數(shù)并發(fā)送結(jié)果,然后清除窗口 FIRE_AND_PURGE(true, true), /** * On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged, * though, all elements are retained. */ //執(zhí)行窗口函數(shù)并發(fā)送結(jié)果,但窗口不清除 FIRE(true, false), /** * All elements in the window are cleared and the window is discarded, without evaluating the * window function or emitting any elements. */ //直接清理數(shù)據(jù)和丟棄窗口 PURGE(false, true);
Evictor
用于在Triger觸發(fā)后,在執(zhí)行WindowFunction前,按指定條件移除一些數(shù)據(jù),如TimeEvictor,移除指定時(shí)間之前的數(shù)據(jù)
WindowOperator
針對(duì)window的處理的StreamOperator,還有一個(gè)子類(lèi)EvictingWindowOperator。針對(duì)每條數(shù)據(jù)的具體處理邏輯都在該類(lèi)中處理,后面我們單獨(dú)展開(kāi)來(lái)介紹
InternalAppendingState
在窗口數(shù)據(jù)沒(méi)有被觸發(fā)時(shí),這些數(shù)據(jù)需要有個(gè)地方進(jìn)行保存。該類(lèi)來(lái)保存相關(guān)的數(shù)據(jù)信息(針對(duì)滑動(dòng)和滾動(dòng)窗口的,session窗口的處理比較復(fù)雜有其他的類(lèi)來(lái)處理),InternalAppendingState類(lèi)是InternalKvState的子類(lèi),這里的key是對(duì)應(yīng)的窗口,這里還有比較多的優(yōu)化和細(xì)節(jié),這塊我們下篇介紹狀態(tài)時(shí)來(lái)深入分析
處理邏輯
下面我們深入來(lái)了解下具體的處理流程,見(jiàn)下圖
WindowOperator對(duì)數(shù)據(jù)的處理流程分為如下幾個(gè)步驟
- 對(duì)傳入的數(shù)據(jù)獲取其對(duì)應(yīng)的窗口列表
- 對(duì)獲取的窗口列表進(jìn)行迭代處理,判斷是否遲到的窗口,如果是遲到的就直接下一個(gè)
- 把數(shù)據(jù)插入到windowState中
- 計(jì)算看是否會(huì)觸發(fā)Triger,如果結(jié)果為FIRE,那就對(duì)窗口數(shù)據(jù)進(jìn)行計(jì)算并發(fā)送出來(lái);如果結(jié)果為需要清理,就清理對(duì)應(yīng)的windowState. session window的處理流程與此類(lèi)似,只是在前面會(huì)判斷窗口是否需要做合并,如果需要會(huì)進(jìn)行合并處理
實(shí)現(xiàn)細(xì)節(jié)注意 1.key在多窗口復(fù)制,如果是滑動(dòng)窗口,那一個(gè)key會(huì)同時(shí)命中多個(gè)窗口,那這里的處理模式是把該key的值存放到多個(gè)窗口的狀態(tài)中
總結(jié)
Flink中通過(guò)多時(shí)間語(yǔ)義和watermark,提供了靈活的方式處理時(shí)效性、準(zhǔn)確性和成本之間的關(guān)系。本篇深入介紹了相關(guān)的機(jī)制信息。另介紹了窗口相關(guān)內(nèi)容,窗口把要處理的數(shù)據(jù)做了個(gè)緩存,直到滿(mǎn)足條件了才觸發(fā)進(jìn)行計(jì)算和發(fā)送到下游。這里的緩存需要使用到Flink的狀態(tài)的機(jī)制,這個(gè)我們下一篇來(lái)介紹。最后附錄提供了2篇講流式處理的經(jīng)典文章
附錄
Streaming 101: The world beyond batch
Streaming 102:The world beyond batch
以上就是Flink時(shí)間和窗口邏輯處理源碼分析的詳細(xì)內(nèi)容,更多關(guān)于Flink 時(shí)間窗口的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java中Supplier知識(shí)點(diǎn)總結(jié)
在本篇文章里小編給大家整理的是一篇關(guān)于java中Supplier知識(shí)點(diǎn)總結(jié)內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。2021-04-04Java實(shí)現(xiàn)分庫(kù)分表實(shí)踐指南
在開(kāi)發(fā)中我們經(jīng)常使用到分庫(kù)分表,但是一般是我們前期就已經(jīng)做了規(guī)劃,對(duì)數(shù)據(jù)庫(kù)怎么劃分,對(duì)哪些表進(jìn)行分表,這篇文章主要給大家介紹了關(guān)于Java實(shí)現(xiàn)分庫(kù)分表的相關(guān)資料,需要的朋友可以參考下2024-01-01springboot依賴(lài)沖突問(wèn)題及解決過(guò)程
新搭了一個(gè)springboot 2.3.7.RELASE的框架,在集成mysql,tkMapper,mybatis的過(guò)程中,啟動(dòng)報(bào)錯(cuò),怎么解決這個(gè)問(wèn)題呢,下面小編給大家?guī)?lái)了springboot依賴(lài)沖突問(wèn)題及解決過(guò)程,一起看看吧2021-09-09

Spring關(guān)于@Configuration配置處理流程

Java反射機(jī)制詳解_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

Java利用位運(yùn)算實(shí)現(xiàn)乘法運(yùn)算詳解