欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java Flink窗口觸發(fā)器Trigger的用法詳解

 更新時(shí)間:2022年07月08日 16:52:10   作者:coding or coded  
Trigger(窗口觸發(fā)器)決定了窗口(由 WindowAssigner 產(chǎn)生)什么時(shí)候調(diào)用窗口處理函數(shù)。可以根據(jù)指定的時(shí)間或數(shù)據(jù)元素條件來(lái)決定什么時(shí)候觸發(fā)。本文將詳細(xì)講講其用法,需要的可以參考一下

定義

Trigger確定窗口(由窗口分配器形成)何時(shí)準(zhǔn)備好由窗口函數(shù)處理。每個(gè)WindowAssigner都帶有一個(gè)默認(rèn)值Trigger。如果默認(rèn)觸發(fā)器不符合您的需求,您可以使用trigger(…)。

Trigger 源碼

public abstract class Trigger<T, W extends Window> implements Serializable {
	/**
	 只要有元素落?到當(dāng)前窗?, 就會(huì)調(diào)?該?法
	 * @param element 收到的元素
	 * @param timestamp 元素抵達(dá)時(shí)間.
	 * @param window 元素所屬的window窗口.
	 * @param ctx ?個(gè)上下?對(duì)象,通常?該對(duì)象注冊(cè) timer(ProcessingTime/EventTime) 回調(diào).
	 */
    public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception;
	
	 /**
	 * processing-time 定時(shí)器回調(diào)函數(shù)
	 *
	 * @param time 定時(shí)器觸發(fā)的時(shí)間.
	 * @param window 定時(shí)器觸發(fā)的窗口對(duì)象.
	 * @param ctx ?個(gè)上下?對(duì)象,通常?該對(duì)象注冊(cè) timer(ProcessingTime/EventTime) 回調(diào).
	 */
    public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

	/**
	 * event-time 定時(shí)器回調(diào)函數(shù)
	 *
	 * @param time 定時(shí)器觸發(fā)的時(shí)間.
	 * @param window 定時(shí)器觸發(fā)的窗口對(duì)象.
	 * @param ctx ?個(gè)上下?對(duì)象,通常?該對(duì)象注冊(cè) timer(ProcessingTime/EventTime) 回調(diào).
	 */
    public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

	 /**
	 * 當(dāng) 多個(gè)窗口合并到?個(gè)窗?的時(shí)候,調(diào)用該方法法,例如系統(tǒng)SessionWindow
	 *
	 * @param window 合并后的新窗口對(duì)象
	 * @param ctx ?個(gè)上下?對(duì)象,通常用該對(duì)象注冊(cè) timer(ProcessingTime/EventTime)回調(diào)以及訪問(wèn)狀態(tài)
	 */
    public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }
	
	/**
	 * 當(dāng)窗口被刪除后執(zhí)?所需的任何操作。例如:可以清除定時(shí)器或者刪除狀態(tài)數(shù)據(jù)
	 */
    public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception;
    }

TriggerResult 源碼

public enum TriggerResult {
	// 表示對(duì)窗口不執(zhí)行任何操作。即不觸發(fā)窗口計(jì)算,也不刪除元素。
    CONTINUE(false, false),
    // 觸發(fā)窗口計(jì)算,輸出結(jié)果,然后將窗口中的數(shù)據(jù)和窗口進(jìn)行清除。
    FIRE_AND_PURGE(true, true),
    // 觸發(fā)窗口計(jì)算,但是保留窗口元素
    FIRE(true, false),
    // 不觸發(fā)窗口計(jì)算,丟棄窗口,并且刪除窗口的元素。
    PURGE(false, true);

    private final boolean fire;
    private final boolean purge;

    private TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return this.fire;
    }

    public boolean isPurge() {
        return this.purge;
    }
}

一旦觸發(fā)器確定窗口已準(zhǔn)備好進(jìn)行處理,就會(huì)觸發(fā),返回狀態(tài)可以是FIRE或FIRE_AND_PURGE。其中FIRE是觸發(fā)窗口計(jì)算并保留窗口內(nèi)容,而FIRE_AND_PURGE是觸發(fā)窗口計(jì)算并刪除窗口內(nèi)容。默認(rèn)情況下,預(yù)實(shí)現(xiàn)的觸發(fā)器只是簡(jiǎn)單地FIRE不清除窗口狀態(tài)。

Flink 預(yù)置的Trigger

  • EventTimeTrigger:通過(guò)對(duì)比EventTime和窗口的Endtime確定是否觸發(fā)窗口計(jì)算,如果EventTime大于Window EndTime則觸發(fā),否則不觸發(fā),窗口將繼續(xù)等待。
  • ProcessTimeTrigger:通過(guò)對(duì)比ProcessTime和窗口EndTme確定是否觸發(fā)窗口,如果ProcessTime大于EndTime則觸發(fā)計(jì)算,否則窗口繼續(xù)等待。
  • ContinuousEventTimeTrigger:根據(jù)間隔時(shí)間周期性觸發(fā)窗口或者Window的結(jié)束時(shí)間小于當(dāng)前EndTime觸發(fā)窗口計(jì)算。
  • ContinuousProcessingTimeTrigger:根據(jù)間隔時(shí)間周期性觸發(fā)窗口或者Window的結(jié)束時(shí)間小于當(dāng)前ProcessTime觸發(fā)窗口計(jì)算。
  • CountTrigger:根據(jù)接入數(shù)據(jù)量是否超過(guò)設(shè)定的闕值判斷是否觸發(fā)窗口計(jì)算。
  • DeltaTrigger:根據(jù)接入數(shù)據(jù)計(jì)算出來(lái)的Delta指標(biāo)是否超過(guò)指定的Threshold去判斷是否觸發(fā)窗口計(jì)算。
  • PurgingTrigger:可以將任意觸發(fā)器作為參數(shù)轉(zhuǎn)換為Purge類(lèi)型的觸發(fā)器,計(jì)算完成后數(shù)據(jù)將被清理。
  • NeverTrigger:任何時(shí)候都不觸發(fā)窗口計(jì)算

主要看看EventTimeTrigger和ProcessingTimeTrigger的源碼。

EventTimeTrigger源碼

public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "EventTimeTrigger()";
    }

    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}

ProcessingTimeTrigger源碼

public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "ProcessingTimeTrigger()";
    }

    public static ProcessingTimeTrigger create() {
        return new ProcessingTimeTrigger();
    }
}

在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())將會(huì)注冊(cè)一個(gè)ProcessingTime定時(shí)器,時(shí)間參數(shù)是window.maxTimestamp(),也就是窗口的最終時(shí)間,當(dāng)時(shí)間到達(dá)這個(gè)窗口最終時(shí)間,定時(shí)器觸發(fā)并調(diào)用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,觸發(fā)窗口中數(shù)據(jù)的計(jì)算,但是會(huì)保留窗口元素。

需要注意的是ProcessingTimeTrigger類(lèi)只會(huì)在窗口的最終時(shí)間到達(dá)的時(shí)候觸發(fā)窗口函數(shù)的計(jì)算,計(jì)算完成后并不會(huì)清除窗口中的數(shù)據(jù),這些數(shù)據(jù)存儲(chǔ)在內(nèi)存中,除非調(diào)用PURGE或FIRE_AND_PURGE,否則數(shù)據(jù)將一直存在內(nèi)存中。實(shí)際上,F(xiàn)link中提供的Trigger類(lèi),除了PurgingTrigger類(lèi),其他的都不會(huì)對(duì)窗口中的數(shù)據(jù)進(jìn)行清除。

常見(jiàn)窗口的Trigger

滾動(dòng)窗口

TumblingEventTimeWindows :EventTimeTrigger
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return EventTimeTrigger.create();
        }
}

TumblingProcessingTimeWindows :ProcessingTimeTrigger

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }
}

滑動(dòng)窗口

SlidingEventTimeWindows:EventTimeTrigger
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }
}

SlidingProcessingTimeWindows :ProcessingTimeTrigger

public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return ProcessingTimeTrigger.create();
        }
}

會(huì)話窗口

EventTimeSessionWindows:EventTimeTrigger
public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }
}

ProcessingTimeSessionWindows:ProcessingTimeTrigger

public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }
}

全局窗口

GlobalWindows :NeverTrigger
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
     public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return new GlobalWindows.NeverTrigger();
        }
}

到此這篇關(guān)于Java Flink窗口觸發(fā)器Trigger的用法詳解的文章就介紹到這了,更多相關(guān)Java Flink窗口觸發(fā)器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringBoot使用前綴樹(shù)實(shí)現(xiàn)敏感詞過(guò)濾示例

    SpringBoot使用前綴樹(shù)實(shí)現(xiàn)敏感詞過(guò)濾示例

    最近項(xiàng)目用到了敏感詞過(guò)濾,本文主要就來(lái)介紹一下SpringBoot使用前綴樹(shù)實(shí)現(xiàn)敏感詞過(guò)濾示例,具有一定的參考價(jià)值,感興趣的可以了解一下
    2023-10-10
  • Java設(shè)計(jì)模式之觀察者模式解析

    Java設(shè)計(jì)模式之觀察者模式解析

    這篇文章主要介紹了Java設(shè)計(jì)模式之觀察者模式解析,觀察者模式,又被稱為發(fā)布/訂閱模式,它定義了一種一對(duì)多的依賴關(guān)系,讓多個(gè)觀察者對(duì)象同時(shí)監(jiān)聽(tīng)某一個(gè)主題對(duì)象,這個(gè)主題對(duì)象在狀態(tài)變化時(shí),會(huì)通知所有的觀察者對(duì)象,使他們能夠自動(dòng)更新自己,需要的朋友可以參考下
    2023-09-09
  • Java 動(dòng)態(tài)代理你真的懂了嗎(動(dòng)態(tài)和代理)

    Java 動(dòng)態(tài)代理你真的懂了嗎(動(dòng)態(tài)和代理)

    動(dòng)態(tài)代理分兩部分,動(dòng)態(tài)和代理,今天通過(guò)本文給大家普及代碼模式及動(dòng)態(tài)代理的概念及示例代碼,感興趣的朋友跟隨小編一起看看吧
    2021-07-07
  • springcloud Zuul動(dòng)態(tài)路由的實(shí)現(xiàn)

    springcloud Zuul動(dòng)態(tài)路由的實(shí)現(xiàn)

    這篇文章主要介紹了springcloud Zuul動(dòng)態(tài)路由的實(shí)現(xiàn),詳細(xì)的介紹了什么是Zuu及其動(dòng)態(tài)路由的實(shí)現(xiàn),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-11-11
  • JAVA各種加密與解密方式總結(jié)大全

    JAVA各種加密與解密方式總結(jié)大全

    這篇文章主要給大家介紹了關(guān)于JAVA各種加密與解密方式總結(jié)的相關(guān)資料,加密是指對(duì)原來(lái)為明文的文件或數(shù)據(jù)按某種算法進(jìn)行處理,使其成為不可讀的一段代碼,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-07-07
  • SpringBoot中MockMVC單元測(cè)試的實(shí)現(xiàn)

    SpringBoot中MockMVC單元測(cè)試的實(shí)現(xiàn)

    Mock是一種用于模擬和替換類(lèi)的對(duì)象的方法,以便在單元測(cè)試中獨(dú)立于外部資源進(jìn)行測(cè)試,本文主要介紹了SpringBoot中MockMVC單元測(cè)試的實(shí)現(xiàn),具有應(yīng)該的參考價(jià)值,感興趣的可以了解一下
    2024-02-02
  • java隨機(jī)生成字符串(字符隨機(jī)生成類(lèi) 生成隨機(jī)字符組合)

    java隨機(jī)生成字符串(字符隨機(jī)生成類(lèi) 生成隨機(jī)字符組合)

    java隨機(jī)生成字符串,字符組合多樣,可以大小字組合、大+小字符+數(shù)字等方式,大家參考使用吧
    2013-12-12
  • Java 中的變量類(lèi)型

    Java 中的變量類(lèi)型

    這篇文章主要介紹了Java 中的變量類(lèi)型,一般包括局部變量、成員變量、類(lèi)變量,下面文章對(duì)這三種內(nèi)容的變量做了一個(gè)詳細(xì)介紹,需要的朋友可以參考一下
    2021-11-11
  • Dubbo之降級(jí)Mock源碼分析

    Dubbo之降級(jí)Mock源碼分析

    這篇文章主要為大家介紹了Dubbo降級(jí)Mock源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-09-09
  • Spring條件注解@Conditional示例詳解

    Spring條件注解@Conditional示例詳解

    這篇文章主要給大家介紹了關(guān)于Spring條件注解@Conditional的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Spring具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-08-08

最新評(píng)論