欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Flink時(shí)間和窗口邏輯處理源碼分析

 更新時(shí)間:2022年12月01日 11:55:15   作者:xiangel  
這篇文章主要為大家介紹了Flink時(shí)間和窗口邏輯處理源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

概覽

  • 計(jì)算模型
    • DataStream基礎(chǔ)框架
    • 事件時(shí)間和窗口
  • 部署&調(diào)度
  • 存儲(chǔ)體系
  • 底層支撐

在實(shí)時(shí)計(jì)算處理時(shí),需要跟時(shí)間來(lái)打交道,如實(shí)時(shí)風(fēng)控場(chǎng)景的時(shí)間行為序列,實(shí)時(shí)分析場(chǎng)景下的時(shí)間窗口統(tǒng)計(jì)等。而由于網(wǎng)絡(luò)等問(wèn)題,會(huì)導(dǎo)致處理時(shí)的數(shù)據(jù)存在亂序問(wèn)題,F(xiàn)link通過(guò)吸收Google Dataflow/Bean的編程模型思想,提供了靈活的處理方式,本篇來(lái)分析下Flink中具體提供的功能和底層機(jī)制。

時(shí)間

Flink中提供了3種時(shí)間類(lèi)型來(lái)滿(mǎn)足不同場(chǎng)景的需求,即處理時(shí)間、事件時(shí)間和接入時(shí)間

  • 處理時(shí)間(Processing time):數(shù)據(jù)在流式系統(tǒng)中處理時(shí)的機(jī)器系統(tǒng)時(shí)間
  • 事件時(shí)間(Event time):每條單獨(dú)的事件在產(chǎn)出設(shè)備上發(fā)生的時(shí)間,即事件實(shí)際發(fā)生的時(shí)間。這個(gè)時(shí)間保存在發(fā)送給Flink系統(tǒng)的數(shù)據(jù)記錄中
  • 接入時(shí)間(Ingestion time):Flink讀取事件時(shí)的時(shí)間 下圖是Flink官方文檔中3個(gè)時(shí)間的標(biāo)識(shí)

而在使用事件時(shí)間的場(chǎng)景下,需要一種方式來(lái)度量目前處理的事件時(shí)間,如使用事件時(shí)間窗口時(shí),需要知道什么時(shí)候來(lái)關(guān)閉這個(gè)窗口,所以這里引入了Watermark的機(jī)制。 這里先介紹Watermark關(guān)聯(lián)的3個(gè)概念

  • WatermarkStrategy:org.apache.flink.table.sources.wmstrategies.WatermarkStrategy,定義怎么在DataStream中去生成Watermark的策略,其子類(lèi)有定義了多種不同的策略
  • WatermarkGenerator:具體生成Watermark的生成器類(lèi)
  • TimestampAssigner:從數(shù)據(jù)記錄中提取時(shí)間戳

重要類(lèi)

WatermarkStrategy

Flink中提供了一些常用的watermark策略,主要我們看看PeriodicWatermarkAssigner這個(gè)策略,周期性水位策略,其有2個(gè)子類(lèi)

  • BoundedOutOfOrderTimestamps:沒(méi)有順序的數(shù)據(jù),指定對(duì)應(yīng)的延遲來(lái)產(chǎn)生watermark,產(chǎn)生的watermark為獲取的數(shù)據(jù)中的最大時(shí)間-指定的delay
  • AscendingTimestamps:對(duì)于有順序的數(shù)據(jù)使用,產(chǎn)生的watermark為獲取的數(shù)據(jù)中最大的時(shí)間-1

WatermarkGenerator

WatermarkGenerator接口有2個(gè)方法

@Public
public interface WatermarkGenerator<T> {
    /**
     * 每來(lái)一條事件數(shù)據(jù)調(diào)用一次,可以檢查或者記錄事件的時(shí)間戳,或者也可以基于事件數(shù)據(jù)本身去生成 watermark。
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
    /**
     * 周期性的調(diào)用,也許會(huì)生成新的 watermark,也許不會(huì)。
     *
     * <p>調(diào)用此方法生成 watermark 的間隔時(shí)間由 {@link ExecutionConfig#getAutoWatermarkInterval()} 決定。
     */
    void onPeriodicEmit(WatermarkOutput output);
}

watermark生成的方式分為2種:周期性生成和標(biāo)記生成 周期性生成的通過(guò)onEvent()方法來(lái)更新最大時(shí)間戳,而在框架調(diào)用onPeriodicEmit()時(shí)發(fā)出watermark 標(biāo)記生成通過(guò)onEvent()來(lái)處理,如果有滿(mǎn)足條件的記錄出現(xiàn),就發(fā)出watermark

TimerService

如何獲取當(dāng)前的處理時(shí)間和watermark呢,這個(gè)在Flink中通過(guò)TimerService來(lái)負(fù)責(zé),下面先看看這個(gè)接口的相關(guān)方法

    //返回當(dāng)前處理時(shí)間
    /** Returns the current processing time. */
    long currentProcessingTime();
    //返回當(dāng)前事件時(shí)間watermark
    /** Returns the current event-time watermark. */
    long currentWatermark();
    //注冊(cè)一個(gè)timer,當(dāng)事件時(shí)間水位超過(guò)給定時(shí)間時(shí)觸發(fā)
    void registerEventTimeTimer(long time);

上面介紹了時(shí)間和watermark相關(guān)的重要類(lèi),下面通過(guò)一個(gè)例子把這些串聯(lián)起來(lái),看其是如何來(lái)運(yùn)轉(zhuǎn)的

處理邏輯

我們以Flink官方文檔中的watermark代碼例子結(jié)合來(lái)介紹

WatermarkStrategy<Event> strategy = WatermarkStrategy
        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.timestamp);
DataStream<Event> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(strategy);

這里通過(guò)WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))生成了一個(gè)延遲為20秒的有界限的watermark策略,然后指定了TimestampAssigner為時(shí)間戳為事件的timestamp字段。 stream.assignTimestampsAndWatermarks方法返回的是一個(gè)DataStream,通過(guò)第一篇的介紹,這里對(duì)應(yīng)有一個(gè)Transformation(TimestampsAndWatermarksTransformation),同時(shí)也對(duì)應(yīng)有一個(gè)StreamOperator(TimestampsAndWatermarksOperator, 注意這個(gè)是在Translator(TimestampsAndWatermarksTransformationTranslator)中定義的),我們看看具體的watermark在TimestampsAndWatermarksOperator中的處理邏輯如何

    public void open() throws Exception {
        super.open();
        timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
        // 創(chuàng)建watermarkGenerator
        watermarkGenerator =
                emitProgressiveWatermarks
                        ? watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
                        : new NoWatermarksGenerator<>();
        wmOutput = new WatermarkEmitter(output);
        //獲取周期性watermark的調(diào)度周期
        watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
        if (watermarkInterval > 0 && emitProgressiveWatermarks) {
            final long now = getProcessingTimeService().getCurrentProcessingTime();
            // 獲取timerService 注冊(cè)Timer
            getProcessingTimeService().registerTimer(now + watermarkInterval, this);
        }
    }

在StreamOperator的前處理方法中,創(chuàng)建了WatermarkGenerator,然后獲取watermark觸發(fā)周期,注冊(cè)到TimerService里面 后續(xù)再StreamOperator的每條數(shù)據(jù)處理方法中(processElement)調(diào)用了

watermarkGenerator.onEvent(event, newTimestamp, wmOutput);

這里實(shí)際會(huì)更新最大事件時(shí)間戳 而前面注冊(cè)Timer時(shí)會(huì)傳入一個(gè)ProcessingTimeCallback對(duì)象,該接口有個(gè)onProcessingTime方法,而TimestampsAndWatermarksOperator實(shí)現(xiàn)了該接口

// ProcessingTimeCallback.java
ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target);
//TimestampsAndWatermarksOperator.java
    @Override
    public void onProcessingTime(long timestamp) throws Exception {
        // 發(fā)送watermark
        watermarkGenerator.onPeriodicEmit(wmOutput);
        // 更新下次觸發(fā)時(shí)間
        final long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + watermarkInterval, this);
    }

這里通過(guò)回調(diào),觸發(fā)發(fā)送watermark和再次注冊(cè)下一個(gè)調(diào)度時(shí)間點(diǎn),而下游算子收到了watermark如何處理呢,如在window算子里面,回去更新算子里面TimerService的currentWatermark,這樣如果新數(shù)據(jù)小于當(dāng)前watermark那就會(huì)丟掉或按siteOutput處理,具體我們?cè)俜治龃翱跁r(shí)再介紹。

窗口

在實(shí)際場(chǎng)景中有很多對(duì)一段時(shí)間的數(shù)據(jù)來(lái)進(jìn)行處理的需求,F(xiàn)link中提供了不同種類(lèi)的窗口來(lái)支持

具體的類(lèi)型有

  • 滾動(dòng)窗口:按固定的區(qū)間劃分,各個(gè)之間不重疊,如近1分鐘的頁(yè)面訪問(wèn)量
  • 滑動(dòng)窗口:按固定區(qū)間劃分,但窗口間會(huì)存在重疊,如每10秒計(jì)算近1分鐘的頁(yè)面訪問(wèn)量
  • 會(huì)話窗口:超過(guò)一段時(shí)間該窗口沒(méi)有數(shù)據(jù)則視為該窗口結(jié)束

重要類(lèi)

Window

定義了窗口的類(lèi)型,目前有2個(gè)子類(lèi)TimeWindow和GlobalWindow。TimeWindow指一個(gè)時(shí)間區(qū)間的,指定了開(kāi)始時(shí)間(含)和結(jié)束時(shí)間(不含); GlobalWindow指一個(gè)單獨(dú)的窗口,包括所有的數(shù)據(jù)

WindowAssigner

分配哪些窗口給輸入的元素,按照不同的窗口類(lèi)型和時(shí)間類(lèi)型有不同的分配方式的子類(lèi)。

  • SlidingProcessingTimeWindows
  • SlidingEventTimeWindows
  • TumblingEventTimeWindows
  • TumblingProcessingTimeWindows
  • GlobalWindows 另外在session Window場(chǎng)景會(huì)涉及到window的合并,這里有一類(lèi)單獨(dú)的MergingWindowAssigner類(lèi)來(lái)實(shí)現(xiàn)

Triger

用于確定每片窗口什么時(shí)候進(jìn)行計(jì)算或清理,如有按時(shí)間、數(shù)量等方式。Triger后有如下幾種結(jié)果(定義在TriggerResult中)

    //不做任何操作
    CONTINUE(false, false),
    /** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */
    //執(zhí)行窗口函數(shù)并發(fā)送結(jié)果,然后清除窗口
    FIRE_AND_PURGE(true, true),
    /**
     * On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged,
     * though, all elements are retained.
     */
    //執(zhí)行窗口函數(shù)并發(fā)送結(jié)果,但窗口不清除
    FIRE(true, false),
    /**
     * All elements in the window are cleared and the window is discarded, without evaluating the
     * window function or emitting any elements.
     */
    //直接清理數(shù)據(jù)和丟棄窗口
    PURGE(false, true);

Evictor

用于在Triger觸發(fā)后,在執(zhí)行WindowFunction前,按指定條件移除一些數(shù)據(jù),如TimeEvictor,移除指定時(shí)間之前的數(shù)據(jù)

WindowOperator

針對(duì)window的處理的StreamOperator,還有一個(gè)子類(lèi)EvictingWindowOperator。針對(duì)每條數(shù)據(jù)的具體處理邏輯都在該類(lèi)中處理,后面我們單獨(dú)展開(kāi)來(lái)介紹

InternalAppendingState

在窗口數(shù)據(jù)沒(méi)有被觸發(fā)時(shí),這些數(shù)據(jù)需要有個(gè)地方進(jìn)行保存。該類(lèi)來(lái)保存相關(guān)的數(shù)據(jù)信息(針對(duì)滑動(dòng)和滾動(dòng)窗口的,session窗口的處理比較復(fù)雜有其他的類(lèi)來(lái)處理),InternalAppendingState類(lèi)是InternalKvState的子類(lèi),這里的key是對(duì)應(yīng)的窗口,這里還有比較多的優(yōu)化和細(xì)節(jié),這塊我們下篇介紹狀態(tài)時(shí)來(lái)深入分析

處理邏輯

下面我們深入來(lái)了解下具體的處理流程,見(jiàn)下圖

WindowOperator對(duì)數(shù)據(jù)的處理流程分為如下幾個(gè)步驟

  • 對(duì)傳入的數(shù)據(jù)獲取其對(duì)應(yīng)的窗口列表
  • 對(duì)獲取的窗口列表進(jìn)行迭代處理,判斷是否遲到的窗口,如果是遲到的就直接下一個(gè)
  • 把數(shù)據(jù)插入到windowState中
  • 計(jì)算看是否會(huì)觸發(fā)Triger,如果結(jié)果為FIRE,那就對(duì)窗口數(shù)據(jù)進(jìn)行計(jì)算并發(fā)送出來(lái);如果結(jié)果為需要清理,就清理對(duì)應(yīng)的windowState. session window的處理流程與此類(lèi)似,只是在前面會(huì)判斷窗口是否需要做合并,如果需要會(huì)進(jìn)行合并處理

實(shí)現(xiàn)細(xì)節(jié)注意 1.key在多窗口復(fù)制,如果是滑動(dòng)窗口,那一個(gè)key會(huì)同時(shí)命中多個(gè)窗口,那這里的處理模式是把該key的值存放到多個(gè)窗口的狀態(tài)中

總結(jié)

Flink中通過(guò)多時(shí)間語(yǔ)義和watermark,提供了靈活的方式處理時(shí)效性、準(zhǔn)確性和成本之間的關(guān)系。本篇深入介紹了相關(guān)的機(jī)制信息。另介紹了窗口相關(guān)內(nèi)容,窗口把要處理的數(shù)據(jù)做了個(gè)緩存,直到滿(mǎn)足條件了才觸發(fā)進(jìn)行計(jì)算和發(fā)送到下游。這里的緩存需要使用到Flink的狀態(tài)的機(jī)制,這個(gè)我們下一篇來(lái)介紹。最后附錄提供了2篇講流式處理的經(jīng)典文章

附錄

Streaming 101: The world beyond batch

Streaming 102:The world beyond batch

以上就是Flink時(shí)間和窗口邏輯處理源碼分析的詳細(xì)內(nèi)容,更多關(guān)于Flink 時(shí)間窗口的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Spring關(guān)于@Configuration配置處理流程

    Spring關(guān)于@Configuration配置處理流程

    這篇文章主要介紹了Spring關(guān)于@Configuration配置處理流程,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-06-06
  • Java反射機(jī)制詳解_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

    Java反射機(jī)制詳解_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

    這篇文章主要為大家詳細(xì)介紹了Java反射機(jī)制的相關(guān)資料,主要包括反射的概念、作用
    2017-06-06
  • SpringBoot加載bean的八種方式總結(jié)

    SpringBoot加載bean的八種方式總結(jié)

    springboot難免要用到bean,但這些bean如何導(dǎo)入,對(duì)于初學(xué)者時(shí)間頭疼的事,下面這篇文章主要給大家介紹了關(guān)于SpringBoot加載bean的八種方式,需要的朋友可以參考下
    2022-10-10
  • Java利用位運(yùn)算實(shí)現(xiàn)乘法運(yùn)算詳解

    Java利用位運(yùn)算實(shí)現(xiàn)乘法運(yùn)算詳解

    這篇文章主要為大家詳細(xì)介紹了Java如何用位運(yùn)算實(shí)現(xiàn)乘法運(yùn)算,在實(shí)現(xiàn)乘法時(shí)要用位運(yùn)算實(shí)現(xiàn),并且不能出現(xiàn)加減乘除任何符號(hào),感興趣的可以了解一下
    2023-04-04
  • 最新評(píng)論