Java Flink窗口觸發(fā)器Trigger的用法詳解
定義
Trigger確定窗口(由窗口分配器形成)何時準備好由窗口函數(shù)處理。每個WindowAssigner都帶有一個默認值Trigger。如果默認觸發(fā)器不符合您的需求,您可以使用trigger(…)。
Trigger 源碼
public abstract class Trigger<T, W extends Window> implements Serializable {
/**
只要有元素落?到當前窗?, 就會調?該?法
* @param element 收到的元素
* @param timestamp 元素抵達時間.
* @param window 元素所屬的window窗口.
* @param ctx ?個上下?對象,通常?該對象注冊 timer(ProcessingTime/EventTime) 回調.
*/
public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception;
/**
* processing-time 定時器回調函數(shù)
*
* @param time 定時器觸發(fā)的時間.
* @param window 定時器觸發(fā)的窗口對象.
* @param ctx ?個上下?對象,通常?該對象注冊 timer(ProcessingTime/EventTime) 回調.
*/
public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;
/**
* event-time 定時器回調函數(shù)
*
* @param time 定時器觸發(fā)的時間.
* @param window 定時器觸發(fā)的窗口對象.
* @param ctx ?個上下?對象,通常?該對象注冊 timer(ProcessingTime/EventTime) 回調.
*/
public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;
/**
* 當 多個窗口合并到?個窗?的時候,調用該方法法,例如系統(tǒng)SessionWindow
*
* @param window 合并后的新窗口對象
* @param ctx ?個上下?對象,通常用該對象注冊 timer(ProcessingTime/EventTime)回調以及訪問狀態(tài)
*/
public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
throw new UnsupportedOperationException("This trigger does not support merging.");
}
/**
* 當窗口被刪除后執(zhí)?所需的任何操作。例如:可以清除定時器或者刪除狀態(tài)數(shù)據(jù)
*/
public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception;
}
TriggerResult 源碼
public enum TriggerResult {
// 表示對窗口不執(zhí)行任何操作。即不觸發(fā)窗口計算,也不刪除元素。
CONTINUE(false, false),
// 觸發(fā)窗口計算,輸出結果,然后將窗口中的數(shù)據(jù)和窗口進行清除。
FIRE_AND_PURGE(true, true),
// 觸發(fā)窗口計算,但是保留窗口元素
FIRE(true, false),
// 不觸發(fā)窗口計算,丟棄窗口,并且刪除窗口的元素。
PURGE(false, true);
private final boolean fire;
private final boolean purge;
private TriggerResult(boolean fire, boolean purge) {
this.purge = purge;
this.fire = fire;
}
public boolean isFire() {
return this.fire;
}
public boolean isPurge() {
return this.purge;
}
}
一旦觸發(fā)器確定窗口已準備好進行處理,就會觸發(fā),返回狀態(tài)可以是FIRE或FIRE_AND_PURGE。其中FIRE是觸發(fā)窗口計算并保留窗口內容,而FIRE_AND_PURGE是觸發(fā)窗口計算并刪除窗口內容。默認情況下,預實現(xiàn)的觸發(fā)器只是簡單地FIRE不清除窗口狀態(tài)。
Flink 預置的Trigger
- EventTimeTrigger:通過對比EventTime和窗口的Endtime確定是否觸發(fā)窗口計算,如果EventTime大于Window EndTime則觸發(fā),否則不觸發(fā),窗口將繼續(xù)等待。
- ProcessTimeTrigger:通過對比ProcessTime和窗口EndTme確定是否觸發(fā)窗口,如果ProcessTime大于EndTime則觸發(fā)計算,否則窗口繼續(xù)等待。
- ContinuousEventTimeTrigger:根據(jù)間隔時間周期性觸發(fā)窗口或者Window的結束時間小于當前EndTime觸發(fā)窗口計算。
- ContinuousProcessingTimeTrigger:根據(jù)間隔時間周期性觸發(fā)窗口或者Window的結束時間小于當前ProcessTime觸發(fā)窗口計算。
- CountTrigger:根據(jù)接入數(shù)據(jù)量是否超過設定的闕值判斷是否觸發(fā)窗口計算。
- DeltaTrigger:根據(jù)接入數(shù)據(jù)計算出來的Delta指標是否超過指定的Threshold去判斷是否觸發(fā)窗口計算。
- PurgingTrigger:可以將任意觸發(fā)器作為參數(shù)轉換為Purge類型的觸發(fā)器,計算完成后數(shù)據(jù)將被清理。
- NeverTrigger:任何時候都不觸發(fā)窗口計算

主要看看EventTimeTrigger和ProcessingTimeTrigger的源碼。
EventTimeTrigger源碼
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private EventTimeTrigger() {
}
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
public boolean canMerge() {
return true;
}
public void onMerge(TimeWindow window, OnMergeContext ctx) {
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}
public String toString() {
return "EventTimeTrigger()";
}
public static EventTimeTrigger create() {
return new EventTimeTrigger();
}
}
ProcessingTimeTrigger源碼
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private ProcessingTimeTrigger() {
}
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE;
}
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
public boolean canMerge() {
return true;
}
public void onMerge(TimeWindow window, OnMergeContext ctx) {
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
ctx.registerProcessingTimeTimer(windowMaxTimestamp);
}
}
public String toString() {
return "ProcessingTimeTrigger()";
}
public static ProcessingTimeTrigger create() {
return new ProcessingTimeTrigger();
}
}
在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())將會注冊一個ProcessingTime定時器,時間參數(shù)是window.maxTimestamp(),也就是窗口的最終時間,當時間到達這個窗口最終時間,定時器觸發(fā)并調用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,觸發(fā)窗口中數(shù)據(jù)的計算,但是會保留窗口元素。
需要注意的是ProcessingTimeTrigger類只會在窗口的最終時間到達的時候觸發(fā)窗口函數(shù)的計算,計算完成后并不會清除窗口中的數(shù)據(jù),這些數(shù)據(jù)存儲在內存中,除非調用PURGE或FIRE_AND_PURGE,否則數(shù)據(jù)將一直存在內存中。實際上,F(xiàn)link中提供的Trigger類,除了PurgingTrigger類,其他的都不會對窗口中的數(shù)據(jù)進行清除。
常見窗口的Trigger
滾動窗口
TumblingEventTimeWindows :EventTimeTrigger
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
}
TumblingProcessingTimeWindows :ProcessingTimeTrigger
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ProcessingTimeTrigger.create();
}
}
滑動窗口
SlidingEventTimeWindows:EventTimeTrigger
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
}
SlidingProcessingTimeWindows :ProcessingTimeTrigger
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ProcessingTimeTrigger.create();
}
}
會話窗口
EventTimeSessionWindows:EventTimeTrigger
public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
}
ProcessingTimeSessionWindows:ProcessingTimeTrigger
public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ProcessingTimeTrigger.create();
}
}
全局窗口
GlobalWindows :NeverTrigger
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return new GlobalWindows.NeverTrigger();
}
}到此這篇關于Java Flink窗口觸發(fā)器Trigger的用法詳解的文章就介紹到這了,更多相關Java Flink窗口觸發(fā)器內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringBoot使用前綴樹實現(xiàn)敏感詞過濾示例
最近項目用到了敏感詞過濾,本文主要就來介紹一下SpringBoot使用前綴樹實現(xiàn)敏感詞過濾示例,具有一定的參考價值,感興趣的可以了解一下2023-10-10
Java 動態(tài)代理你真的懂了嗎(動態(tài)和代理)
動態(tài)代理分兩部分,動態(tài)和代理,今天通過本文給大家普及代碼模式及動態(tài)代理的概念及示例代碼,感興趣的朋友跟隨小編一起看看吧2021-07-07
springcloud Zuul動態(tài)路由的實現(xiàn)
這篇文章主要介紹了springcloud Zuul動態(tài)路由的實現(xiàn),詳細的介紹了什么是Zuu及其動態(tài)路由的實現(xiàn),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-11-11
SpringBoot中MockMVC單元測試的實現(xiàn)
Mock是一種用于模擬和替換類的對象的方法,以便在單元測試中獨立于外部資源進行測試,本文主要介紹了SpringBoot中MockMVC單元測試的實現(xiàn),具有應該的參考價值,感興趣的可以了解一下2024-02-02

