Flink時間和窗口邏輯處理源碼分析
概覽
- 計算模型
- DataStream基礎框架
- 事件時間和窗口
- 部署&調(diào)度
- 存儲體系
- 底層支撐
在實時計算處理時,需要跟時間來打交道,如實時風控場景的時間行為序列,實時分析場景下的時間窗口統(tǒng)計等。而由于網(wǎng)絡等問題,會導致處理時的數(shù)據(jù)存在亂序問題,F(xiàn)link通過吸收Google Dataflow/Bean的編程模型思想,提供了靈活的處理方式,本篇來分析下Flink中具體提供的功能和底層機制。
時間
Flink中提供了3種時間類型來滿足不同場景的需求,即處理時間、事件時間和接入時間
- 處理時間(Processing time):數(shù)據(jù)在流式系統(tǒng)中處理時的機器系統(tǒng)時間
- 事件時間(Event time):每條單獨的事件在產(chǎn)出設備上發(fā)生的時間,即事件實際發(fā)生的時間。這個時間保存在發(fā)送給Flink系統(tǒng)的數(shù)據(jù)記錄中
- 接入時間(Ingestion time):Flink讀取事件時的時間 下圖是Flink官方文檔中3個時間的標識
而在使用事件時間的場景下,需要一種方式來度量目前處理的事件時間,如使用事件時間窗口時,需要知道什么時候來關閉這個窗口,所以這里引入了Watermark的機制。 這里先介紹Watermark關聯(lián)的3個概念
- WatermarkStrategy:org.apache.flink.table.sources.wmstrategies.WatermarkStrategy,定義怎么在DataStream中去生成Watermark的策略,其子類有定義了多種不同的策略
- WatermarkGenerator:具體生成Watermark的生成器類
- TimestampAssigner:從數(shù)據(jù)記錄中提取時間戳
重要類
WatermarkStrategy
Flink中提供了一些常用的watermark策略,主要我們看看PeriodicWatermarkAssigner這個策略,周期性水位策略,其有2個子類
- BoundedOutOfOrderTimestamps:沒有順序的數(shù)據(jù),指定對應的延遲來產(chǎn)生watermark,產(chǎn)生的watermark為獲取的數(shù)據(jù)中的最大時間-指定的delay
- AscendingTimestamps:對于有順序的數(shù)據(jù)使用,產(chǎn)生的watermark為獲取的數(shù)據(jù)中最大的時間-1
WatermarkGenerator
WatermarkGenerator接口有2個方法
@Public public interface WatermarkGenerator<T> { /** * 每來一條事件數(shù)據(jù)調(diào)用一次,可以檢查或者記錄事件的時間戳,或者也可以基于事件數(shù)據(jù)本身去生成 watermark。 */ void onEvent(T event, long eventTimestamp, WatermarkOutput output); /** * 周期性的調(diào)用,也許會生成新的 watermark,也許不會。 * * <p>調(diào)用此方法生成 watermark 的間隔時間由 {@link ExecutionConfig#getAutoWatermarkInterval()} 決定。 */ void onPeriodicEmit(WatermarkOutput output); }
watermark生成的方式分為2種:周期性生成和標記生成 周期性生成的通過onEvent()方法來更新最大時間戳,而在框架調(diào)用onPeriodicEmit()時發(fā)出watermark 標記生成通過onEvent()來處理,如果有滿足條件的記錄出現(xiàn),就發(fā)出watermark
TimerService
如何獲取當前的處理時間和watermark呢,這個在Flink中通過TimerService來負責,下面先看看這個接口的相關方法
//返回當前處理時間 /** Returns the current processing time. */ long currentProcessingTime(); //返回當前事件時間watermark /** Returns the current event-time watermark. */ long currentWatermark(); //注冊一個timer,當事件時間水位超過給定時間時觸發(fā) void registerEventTimeTimer(long time);
上面介紹了時間和watermark相關的重要類,下面通過一個例子把這些串聯(lián)起來,看其是如何來運轉(zhuǎn)的
處理邏輯
我們以Flink官方文檔中的watermark代碼例子結(jié)合來介紹
WatermarkStrategy<Event> strategy = WatermarkStrategy .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20)) .withTimestampAssigner((event, timestamp) -> event.timestamp); DataStream<Event> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(strategy);
這里通過WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))生成了一個延遲為20秒的有界限的watermark策略,然后指定了TimestampAssigner為時間戳為事件的timestamp字段。 stream.assignTimestampsAndWatermarks方法返回的是一個DataStream,通過第一篇的介紹,這里對應有一個Transformation(TimestampsAndWatermarksTransformation),同時也對應有一個StreamOperator(TimestampsAndWatermarksOperator, 注意這個是在Translator(TimestampsAndWatermarksTransformationTranslator)中定義的),我們看看具體的watermark在TimestampsAndWatermarksOperator中的處理邏輯如何
public void open() throws Exception { super.open(); timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup); // 創(chuàng)建watermarkGenerator watermarkGenerator = emitProgressiveWatermarks ? watermarkStrategy.createWatermarkGenerator(this::getMetricGroup) : new NoWatermarksGenerator<>(); wmOutput = new WatermarkEmitter(output); //獲取周期性watermark的調(diào)度周期 watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); if (watermarkInterval > 0 && emitProgressiveWatermarks) { final long now = getProcessingTimeService().getCurrentProcessingTime(); // 獲取timerService 注冊Timer getProcessingTimeService().registerTimer(now + watermarkInterval, this); } }
在StreamOperator的前處理方法中,創(chuàng)建了WatermarkGenerator,然后獲取watermark觸發(fā)周期,注冊到TimerService里面 后續(xù)再StreamOperator的每條數(shù)據(jù)處理方法中(processElement)調(diào)用了
watermarkGenerator.onEvent(event, newTimestamp, wmOutput);
這里實際會更新最大事件時間戳 而前面注冊Timer時會傳入一個ProcessingTimeCallback對象,該接口有個onProcessingTime方法,而TimestampsAndWatermarksOperator實現(xiàn)了該接口
// ProcessingTimeCallback.java ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target);
//TimestampsAndWatermarksOperator.java @Override public void onProcessingTime(long timestamp) throws Exception { // 發(fā)送watermark watermarkGenerator.onPeriodicEmit(wmOutput); // 更新下次觸發(fā)時間 final long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); }
這里通過回調(diào),觸發(fā)發(fā)送watermark和再次注冊下一個調(diào)度時間點,而下游算子收到了watermark如何處理呢,如在window算子里面,回去更新算子里面TimerService的currentWatermark,這樣如果新數(shù)據(jù)小于當前watermark那就會丟掉或按siteOutput處理,具體我們再分析窗口時再介紹。
窗口
在實際場景中有很多對一段時間的數(shù)據(jù)來進行處理的需求,F(xiàn)link中提供了不同種類的窗口來支持
具體的類型有
- 滾動窗口:按固定的區(qū)間劃分,各個之間不重疊,如近1分鐘的頁面訪問量
- 滑動窗口:按固定區(qū)間劃分,但窗口間會存在重疊,如每10秒計算近1分鐘的頁面訪問量
- 會話窗口:超過一段時間該窗口沒有數(shù)據(jù)則視為該窗口結(jié)束
重要類
Window
定義了窗口的類型,目前有2個子類TimeWindow和GlobalWindow。TimeWindow指一個時間區(qū)間的,指定了開始時間(含)和結(jié)束時間(不含); GlobalWindow指一個單獨的窗口,包括所有的數(shù)據(jù)
WindowAssigner
分配哪些窗口給輸入的元素,按照不同的窗口類型和時間類型有不同的分配方式的子類。
- SlidingProcessingTimeWindows
- SlidingEventTimeWindows
- TumblingEventTimeWindows
- TumblingProcessingTimeWindows
- GlobalWindows 另外在session Window場景會涉及到window的合并,這里有一類單獨的MergingWindowAssigner類來實現(xiàn)
Triger
用于確定每片窗口什么時候進行計算或清理,如有按時間、數(shù)量等方式。Triger后有如下幾種結(jié)果(定義在TriggerResult中)
//不做任何操作 CONTINUE(false, false), /** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */ //執(zhí)行窗口函數(shù)并發(fā)送結(jié)果,然后清除窗口 FIRE_AND_PURGE(true, true), /** * On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged, * though, all elements are retained. */ //執(zhí)行窗口函數(shù)并發(fā)送結(jié)果,但窗口不清除 FIRE(true, false), /** * All elements in the window are cleared and the window is discarded, without evaluating the * window function or emitting any elements. */ //直接清理數(shù)據(jù)和丟棄窗口 PURGE(false, true);
Evictor
用于在Triger觸發(fā)后,在執(zhí)行WindowFunction前,按指定條件移除一些數(shù)據(jù),如TimeEvictor,移除指定時間之前的數(shù)據(jù)
WindowOperator
針對window的處理的StreamOperator,還有一個子類EvictingWindowOperator。針對每條數(shù)據(jù)的具體處理邏輯都在該類中處理,后面我們單獨展開來介紹
InternalAppendingState
在窗口數(shù)據(jù)沒有被觸發(fā)時,這些數(shù)據(jù)需要有個地方進行保存。該類來保存相關的數(shù)據(jù)信息(針對滑動和滾動窗口的,session窗口的處理比較復雜有其他的類來處理),InternalAppendingState類是InternalKvState的子類,這里的key是對應的窗口,這里還有比較多的優(yōu)化和細節(jié),這塊我們下篇介紹狀態(tài)時來深入分析
處理邏輯
下面我們深入來了解下具體的處理流程,見下圖
WindowOperator對數(shù)據(jù)的處理流程分為如下幾個步驟
- 對傳入的數(shù)據(jù)獲取其對應的窗口列表
- 對獲取的窗口列表進行迭代處理,判斷是否遲到的窗口,如果是遲到的就直接下一個
- 把數(shù)據(jù)插入到windowState中
- 計算看是否會觸發(fā)Triger,如果結(jié)果為FIRE,那就對窗口數(shù)據(jù)進行計算并發(fā)送出來;如果結(jié)果為需要清理,就清理對應的windowState. session window的處理流程與此類似,只是在前面會判斷窗口是否需要做合并,如果需要會進行合并處理
實現(xiàn)細節(jié)注意 1.key在多窗口復制,如果是滑動窗口,那一個key會同時命中多個窗口,那這里的處理模式是把該key的值存放到多個窗口的狀態(tài)中
總結(jié)
Flink中通過多時間語義和watermark,提供了靈活的方式處理時效性、準確性和成本之間的關系。本篇深入介紹了相關的機制信息。另介紹了窗口相關內(nèi)容,窗口把要處理的數(shù)據(jù)做了個緩存,直到滿足條件了才觸發(fā)進行計算和發(fā)送到下游。這里的緩存需要使用到Flink的狀態(tài)的機制,這個我們下一篇來介紹。最后附錄提供了2篇講流式處理的經(jīng)典文章
附錄
Streaming 101: The world beyond batch
Streaming 102:The world beyond batch
以上就是Flink時間和窗口邏輯處理源碼分析的詳細內(nèi)容,更多關于Flink 時間窗口的資料請關注腳本之家其它相關文章!
相關文章
Java servlet通過事件驅(qū)動進行高性能長輪詢詳解
這篇文章主要介紹了基于servlet3.0+事件驅(qū)動實現(xiàn)高性能長輪詢的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2022-06-06