欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Flink時間和窗口邏輯處理源碼分析

 更新時間:2022年12月01日 11:55:15   作者:xiangel  
這篇文章主要為大家介紹了Flink時間和窗口邏輯處理源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

概覽

  • 計算模型
    • DataStream基礎框架
    • 事件時間和窗口
  • 部署&調(diào)度
  • 存儲體系
  • 底層支撐

在實時計算處理時,需要跟時間來打交道,如實時風控場景的時間行為序列,實時分析場景下的時間窗口統(tǒng)計等。而由于網(wǎng)絡等問題,會導致處理時的數(shù)據(jù)存在亂序問題,F(xiàn)link通過吸收Google Dataflow/Bean的編程模型思想,提供了靈活的處理方式,本篇來分析下Flink中具體提供的功能和底層機制。

時間

Flink中提供了3種時間類型來滿足不同場景的需求,即處理時間、事件時間和接入時間

  • 處理時間(Processing time):數(shù)據(jù)在流式系統(tǒng)中處理時的機器系統(tǒng)時間
  • 事件時間(Event time):每條單獨的事件在產(chǎn)出設備上發(fā)生的時間,即事件實際發(fā)生的時間。這個時間保存在發(fā)送給Flink系統(tǒng)的數(shù)據(jù)記錄中
  • 接入時間(Ingestion time):Flink讀取事件時的時間 下圖是Flink官方文檔中3個時間的標識

而在使用事件時間的場景下,需要一種方式來度量目前處理的事件時間,如使用事件時間窗口時,需要知道什么時候來關閉這個窗口,所以這里引入了Watermark的機制。 這里先介紹Watermark關聯(lián)的3個概念

  • WatermarkStrategy:org.apache.flink.table.sources.wmstrategies.WatermarkStrategy,定義怎么在DataStream中去生成Watermark的策略,其子類有定義了多種不同的策略
  • WatermarkGenerator:具體生成Watermark的生成器類
  • TimestampAssigner:從數(shù)據(jù)記錄中提取時間戳

重要類

WatermarkStrategy

Flink中提供了一些常用的watermark策略,主要我們看看PeriodicWatermarkAssigner這個策略,周期性水位策略,其有2個子類

  • BoundedOutOfOrderTimestamps:沒有順序的數(shù)據(jù),指定對應的延遲來產(chǎn)生watermark,產(chǎn)生的watermark為獲取的數(shù)據(jù)中的最大時間-指定的delay
  • AscendingTimestamps:對于有順序的數(shù)據(jù)使用,產(chǎn)生的watermark為獲取的數(shù)據(jù)中最大的時間-1

WatermarkGenerator

WatermarkGenerator接口有2個方法

@Public
public interface WatermarkGenerator<T> {
    /**
     * 每來一條事件數(shù)據(jù)調(diào)用一次,可以檢查或者記錄事件的時間戳,或者也可以基于事件數(shù)據(jù)本身去生成 watermark。
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
    /**
     * 周期性的調(diào)用,也許會生成新的 watermark,也許不會。
     *
     * <p>調(diào)用此方法生成 watermark 的間隔時間由 {@link ExecutionConfig#getAutoWatermarkInterval()} 決定。
     */
    void onPeriodicEmit(WatermarkOutput output);
}

watermark生成的方式分為2種:周期性生成和標記生成 周期性生成的通過onEvent()方法來更新最大時間戳,而在框架調(diào)用onPeriodicEmit()時發(fā)出watermark 標記生成通過onEvent()來處理,如果有滿足條件的記錄出現(xiàn),就發(fā)出watermark

TimerService

如何獲取當前的處理時間和watermark呢,這個在Flink中通過TimerService來負責,下面先看看這個接口的相關方法

    //返回當前處理時間
    /** Returns the current processing time. */
    long currentProcessingTime();
    //返回當前事件時間watermark
    /** Returns the current event-time watermark. */
    long currentWatermark();
    //注冊一個timer,當事件時間水位超過給定時間時觸發(fā)
    void registerEventTimeTimer(long time);

上面介紹了時間和watermark相關的重要類,下面通過一個例子把這些串聯(lián)起來,看其是如何來運轉(zhuǎn)的

處理邏輯

我們以Flink官方文檔中的watermark代碼例子結(jié)合來介紹

WatermarkStrategy<Event> strategy = WatermarkStrategy
        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.timestamp);
DataStream<Event> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(strategy);

這里通過WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))生成了一個延遲為20秒的有界限的watermark策略,然后指定了TimestampAssigner為時間戳為事件的timestamp字段。 stream.assignTimestampsAndWatermarks方法返回的是一個DataStream,通過第一篇的介紹,這里對應有一個Transformation(TimestampsAndWatermarksTransformation),同時也對應有一個StreamOperator(TimestampsAndWatermarksOperator, 注意這個是在Translator(TimestampsAndWatermarksTransformationTranslator)中定義的),我們看看具體的watermark在TimestampsAndWatermarksOperator中的處理邏輯如何

    public void open() throws Exception {
        super.open();
        timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
        // 創(chuàng)建watermarkGenerator
        watermarkGenerator =
                emitProgressiveWatermarks
                        ? watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
                        : new NoWatermarksGenerator<>();
        wmOutput = new WatermarkEmitter(output);
        //獲取周期性watermark的調(diào)度周期
        watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
        if (watermarkInterval > 0 && emitProgressiveWatermarks) {
            final long now = getProcessingTimeService().getCurrentProcessingTime();
            // 獲取timerService 注冊Timer
            getProcessingTimeService().registerTimer(now + watermarkInterval, this);
        }
    }

在StreamOperator的前處理方法中,創(chuàng)建了WatermarkGenerator,然后獲取watermark觸發(fā)周期,注冊到TimerService里面 后續(xù)再StreamOperator的每條數(shù)據(jù)處理方法中(processElement)調(diào)用了

watermarkGenerator.onEvent(event, newTimestamp, wmOutput);

這里實際會更新最大事件時間戳 而前面注冊Timer時會傳入一個ProcessingTimeCallback對象,該接口有個onProcessingTime方法,而TimestampsAndWatermarksOperator實現(xiàn)了該接口

// ProcessingTimeCallback.java
ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target);
//TimestampsAndWatermarksOperator.java
    @Override
    public void onProcessingTime(long timestamp) throws Exception {
        // 發(fā)送watermark
        watermarkGenerator.onPeriodicEmit(wmOutput);
        // 更新下次觸發(fā)時間
        final long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + watermarkInterval, this);
    }

這里通過回調(diào),觸發(fā)發(fā)送watermark和再次注冊下一個調(diào)度時間點,而下游算子收到了watermark如何處理呢,如在window算子里面,回去更新算子里面TimerService的currentWatermark,這樣如果新數(shù)據(jù)小于當前watermark那就會丟掉或按siteOutput處理,具體我們再分析窗口時再介紹。

窗口

在實際場景中有很多對一段時間的數(shù)據(jù)來進行處理的需求,F(xiàn)link中提供了不同種類的窗口來支持

具體的類型有

  • 滾動窗口:按固定的區(qū)間劃分,各個之間不重疊,如近1分鐘的頁面訪問量
  • 滑動窗口:按固定區(qū)間劃分,但窗口間會存在重疊,如每10秒計算近1分鐘的頁面訪問量
  • 會話窗口:超過一段時間該窗口沒有數(shù)據(jù)則視為該窗口結(jié)束

重要類

Window

定義了窗口的類型,目前有2個子類TimeWindow和GlobalWindow。TimeWindow指一個時間區(qū)間的,指定了開始時間(含)和結(jié)束時間(不含); GlobalWindow指一個單獨的窗口,包括所有的數(shù)據(jù)

WindowAssigner

分配哪些窗口給輸入的元素,按照不同的窗口類型和時間類型有不同的分配方式的子類。

  • SlidingProcessingTimeWindows
  • SlidingEventTimeWindows
  • TumblingEventTimeWindows
  • TumblingProcessingTimeWindows
  • GlobalWindows 另外在session Window場景會涉及到window的合并,這里有一類單獨的MergingWindowAssigner類來實現(xiàn)

Triger

用于確定每片窗口什么時候進行計算或清理,如有按時間、數(shù)量等方式。Triger后有如下幾種結(jié)果(定義在TriggerResult中)

    //不做任何操作
    CONTINUE(false, false),
    /** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */
    //執(zhí)行窗口函數(shù)并發(fā)送結(jié)果,然后清除窗口
    FIRE_AND_PURGE(true, true),
    /**
     * On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged,
     * though, all elements are retained.
     */
    //執(zhí)行窗口函數(shù)并發(fā)送結(jié)果,但窗口不清除
    FIRE(true, false),
    /**
     * All elements in the window are cleared and the window is discarded, without evaluating the
     * window function or emitting any elements.
     */
    //直接清理數(shù)據(jù)和丟棄窗口
    PURGE(false, true);

Evictor

用于在Triger觸發(fā)后,在執(zhí)行WindowFunction前,按指定條件移除一些數(shù)據(jù),如TimeEvictor,移除指定時間之前的數(shù)據(jù)

WindowOperator

針對window的處理的StreamOperator,還有一個子類EvictingWindowOperator。針對每條數(shù)據(jù)的具體處理邏輯都在該類中處理,后面我們單獨展開來介紹

InternalAppendingState

在窗口數(shù)據(jù)沒有被觸發(fā)時,這些數(shù)據(jù)需要有個地方進行保存。該類來保存相關的數(shù)據(jù)信息(針對滑動和滾動窗口的,session窗口的處理比較復雜有其他的類來處理),InternalAppendingState類是InternalKvState的子類,這里的key是對應的窗口,這里還有比較多的優(yōu)化和細節(jié),這塊我們下篇介紹狀態(tài)時來深入分析

處理邏輯

下面我們深入來了解下具體的處理流程,見下圖

WindowOperator對數(shù)據(jù)的處理流程分為如下幾個步驟

  • 對傳入的數(shù)據(jù)獲取其對應的窗口列表
  • 對獲取的窗口列表進行迭代處理,判斷是否遲到的窗口,如果是遲到的就直接下一個
  • 把數(shù)據(jù)插入到windowState中
  • 計算看是否會觸發(fā)Triger,如果結(jié)果為FIRE,那就對窗口數(shù)據(jù)進行計算并發(fā)送出來;如果結(jié)果為需要清理,就清理對應的windowState. session window的處理流程與此類似,只是在前面會判斷窗口是否需要做合并,如果需要會進行合并處理

實現(xiàn)細節(jié)注意 1.key在多窗口復制,如果是滑動窗口,那一個key會同時命中多個窗口,那這里的處理模式是把該key的值存放到多個窗口的狀態(tài)中

總結(jié)

Flink中通過多時間語義和watermark,提供了靈活的方式處理時效性、準確性和成本之間的關系。本篇深入介紹了相關的機制信息。另介紹了窗口相關內(nèi)容,窗口把要處理的數(shù)據(jù)做了個緩存,直到滿足條件了才觸發(fā)進行計算和發(fā)送到下游。這里的緩存需要使用到Flink的狀態(tài)的機制,這個我們下一篇來介紹。最后附錄提供了2篇講流式處理的經(jīng)典文章

附錄

Streaming 101: The world beyond batch

Streaming 102:The world beyond batch

以上就是Flink時間和窗口邏輯處理源碼分析的詳細內(nèi)容,更多關于Flink 時間窗口的資料請關注腳本之家其它相關文章!

相關文章

  • java中Supplier知識點總結(jié)

    java中Supplier知識點總結(jié)

    在本篇文章里小編給大家整理的是一篇關于java中Supplier知識點總結(jié)內(nèi)容,有興趣的朋友們可以學習下。
    2021-04-04
  • Java線程同步方法實例總結(jié)

    Java線程同步方法實例總結(jié)

    這篇文章主要介紹了Java線程同步方法,結(jié)合實例形式總結(jié)分析了Java線程同步、并發(fā)控制相關實現(xiàn)方法及操作注意事項,需要的朋友可以參考下
    2018-08-08
  • Java實現(xiàn)分庫分表實踐指南

    Java實現(xiàn)分庫分表實踐指南

    在開發(fā)中我們經(jīng)常使用到分庫分表,但是一般是我們前期就已經(jīng)做了規(guī)劃,對數(shù)據(jù)庫怎么劃分,對哪些表進行分表,這篇文章主要給大家介紹了關于Java實現(xiàn)分庫分表的相關資料,需要的朋友可以參考下
    2024-01-01
  • 詳解Java打包鏡像部署

    詳解Java打包鏡像部署

    這篇文章主要介紹了Java打包鏡像部署,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2023-11-11
  • springboot依賴沖突問題及解決過程

    springboot依賴沖突問題及解決過程

    新搭了一個springboot 2.3.7.RELASE的框架,在集成mysql,tkMapper,mybatis的過程中,啟動報錯,怎么解決這個問題呢,下面小編給大家?guī)砹藄pringboot依賴沖突問題及解決過程,一起看看吧
    2021-09-09
  • Java servlet通過事件驅(qū)動進行高性能長輪詢詳解

    Java servlet通過事件驅(qū)動進行高性能長輪詢詳解

    這篇文章主要介紹了基于servlet3.0+事件驅(qū)動實現(xiàn)高性能長輪詢的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2022-06-06
  • Spring關于@Configuration配置處理流程

    Spring關于@Configuration配置處理流程

    這篇文章主要介紹了Spring關于@Configuration配置處理流程,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-06-06
  • Java反射機制詳解_動力節(jié)點Java學院整理

    Java反射機制詳解_動力節(jié)點Java學院整理

    這篇文章主要為大家詳細介紹了Java反射機制的相關資料,主要包括反射的概念、作用
    2017-06-06
  • SpringBoot加載bean的八種方式總結(jié)

    SpringBoot加載bean的八種方式總結(jié)

    springboot難免要用到bean,但這些bean如何導入,對于初學者時間頭疼的事,下面這篇文章主要給大家介紹了關于SpringBoot加載bean的八種方式,需要的朋友可以參考下
    2022-10-10
  • Java利用位運算實現(xiàn)乘法運算詳解

    Java利用位運算實現(xiàn)乘法運算詳解

    這篇文章主要為大家詳細介紹了Java如何用位運算實現(xiàn)乘法運算,在實現(xiàn)乘法時要用位運算實現(xiàn),并且不能出現(xiàn)加減乘除任何符號,感興趣的可以了解一下
    2023-04-04

最新評論