Apache FlinkCEP 實(shí)現(xiàn)超時(shí)狀態(tài)監(jiān)控的步驟詳解
CEP - Complex Event Processing復(fù)雜事件處理。
訂單下單后超過(guò)一定時(shí)間還未進(jìn)行支付確認(rèn)。
打車(chē)訂單生成后超過(guò)一定時(shí)間沒(méi)有確認(rèn)上車(chē)。
外賣(mài)超過(guò)預(yù)定送達(dá)時(shí)間一定時(shí)限還沒(méi)有確認(rèn)送達(dá)。
Apache FlinkCEP API
CEPTimeoutEventJob
FlinkCEP源碼簡(jiǎn)析
DataStream和PatternStream
DataStream 一般由相同類(lèi)型事件或元素組成,一個(gè)DataStream可以通過(guò)一系列的轉(zhuǎn)換操作如Filter、Map等轉(zhuǎn)換為另一個(gè)DataStream。
PatternStream 是對(duì)CEP模式匹配的流的抽象,把DataStream和Pattern組合在一塊,然后對(duì)外提供select和flatSelect等方法。PatternStream并不是DataStream,它提供方法把匹配的模式序列和與其相關(guān)聯(lián)的事件組成的映射(就是Map<模式名稱(chēng),List<事件>>)發(fā)出去,發(fā)到SingleOutputStreamOperator里面,SingleOutputStreamOperator是DataStream。
CEPOperatorUtils工具類(lèi)里的方法和變量使用了「PatternStream」來(lái)命名,比如:
public static <IN, OUT> SingleOutputStreamOperator <OUT> createPatternStream(...){...} public static <IN, OUT1, OUT2> SingleOutputStreamOperator <OUT1> createTimeoutPatternStream(...){...} final SingleOutputStreamOperator <OUT> patternStream;
SingleOutputStreamOperator
@Public public class SingleOutputStreamOperator <T> extends DataStream <T> {...}
PatternStream的構(gòu)造方法:
PatternStream ( final DataStream <T> inputStream, final Pattern <T, ?> pattern) { this .inputStream = inputStream; this .pattern = pattern; this .comparator = null ; } PatternStream ( final DataStream <T> inputStream, final Pattern <T, ?> pattern, final EventComparator <T> comparator) { this .inputStream = inputStream; this .pattern = pattern; this .comparator = comparator; }
Pattern、Quantifier和EventComparator
Pattern是模式定義的Base Class,Builder模式,定義好的模式會(huì)被NFACompiler用來(lái)生成NFA。
如果想要自己實(shí)現(xiàn)類(lèi)似next和followedBy這種方法,比如timeEnd,對(duì)Pattern進(jìn)行擴(kuò)展重寫(xiě)應(yīng)該是可行的。
public class Pattern <T, F extends T> { /** 模式名稱(chēng) */ private final String name; /** 前面一個(gè)模式 */ private final Pattern <T, ? extends T> previous; /** 一個(gè)事件如果要被當(dāng)前模式匹配到,必須滿足的約束條件 */ private IterativeCondition <F> condition; /** 時(shí)間窗口長(zhǎng)度,在時(shí)間長(zhǎng)度內(nèi)進(jìn)行模式匹配 */ private Time windowTime; /** 模式量詞,意思是一個(gè)模式匹配幾個(gè)事件等 默認(rèn)是匹配到一個(gè) */ private Quantifier quantifier = Quantifier .one( ConsumingStrategy .STRICT); /** 停止將事件收集到循環(huán)狀態(tài)時(shí),事件必須滿足的條件 */ private IterativeCondition <F> untilCondition; /** * 適用于{@code times}模式,用來(lái)維護(hù)模式里事件可以連續(xù)發(fā)生的次數(shù) */ private Times times; // 匹配到事件之后的跳過(guò)策略 private final AfterMatchSkipStrategy afterMatchSkipStrategy; ... }
Quantifier是用來(lái)描述具體模式行為的,主要有三大類(lèi):
Single-單一匹配、Looping-循環(huán)匹配、Times-一定次數(shù)或者次數(shù)范圍內(nèi)都能匹配到。
每一個(gè)模式Pattern可以是optional可選的(單一匹配或循環(huán)匹配),并可以設(shè)置ConsumingStrategy。
循環(huán)和次數(shù)也有一個(gè)額外的內(nèi)部ConsumingStrategy,用在模式中接收的事件之間。
public class Quantifier { ... /** * 5個(gè)屬性,可以組合,但并非所有的組合都是有效的 */ public enum QuantifierProperty { SINGLE, LOOPING, TIMES, OPTIONAL, GREEDY } /** * 描述在此模式中匹配哪些事件的策略 */ public enum ConsumingStrategy { STRICT, SKIP_TILL_NEXT, SKIP_TILL_ANY, NOT_FOLLOW, NOT_NEXT } /** * 描述當(dāng)前模式里事件可以連續(xù)發(fā)生的次數(shù);舉個(gè)例子,模式條件無(wú)非就是boolean,滿足true條件的事件連續(xù)出現(xiàn)times次,或者一個(gè)次數(shù)范圍,比如2~4次,2次,3次,4次都會(huì)被當(dāng)前模式匹配出來(lái),因此同一個(gè)事件會(huì)被重復(fù)匹配到 */ public static class Times { private final int from; private final int to; private Times ( int from, int to) { Preconditions .checkArgument(from > 0 , "The from should be a positive number greater than 0." ); Preconditions .checkArgument(to >= from, "The to should be a number greater than or equal to from: " + from + "." ); this .from = from; this .to = to; } public int getFrom() { return from; } public int getTo() { return to; } // 次數(shù)范圍 public static Times of( int from, int to) { return new Times (from, to); } // 指定具體次數(shù) public static Times of( int times) { return new Times (times, times); } @Override public boolean equals( Object o) { if ( this == o) { return true ; } if (o == null || getClass() != o.getClass()) { return false ; } Times times = ( Times ) o; return from == times.from && to == times.to; } @Override public int hashCode() { return Objects .hash(from, to); } } ... }
EventComparator,自定義事件比較器,實(shí)現(xiàn)EventComparator接口。
public interface EventComparator <T> extends Comparator <T>, Serializable { long serialVersionUID = 1L ; }
NFACompiler和NFA
NFACompiler提供將Pattern編譯成NFA或者NFAFactory的方法,使用NFAFactory可以創(chuàng)建多個(gè)NFA。
public class NFACompiler { ... /** * NFAFactory 創(chuàng)建NFA的接口 * * @param <T> Type of the input events which are processed by the NFA */ public interface NFAFactory <T> extends Serializable { NFA<T> createNFA(); } /** * NFAFactory的具體實(shí)現(xiàn)NFAFactoryImpl * * <p>The implementation takes the input type serializer, the window time and the set of * states and their transitions to be able to create an NFA from them. * * @param <T> Type of the input events which are processed by the NFA */ private static class NFAFactoryImpl <T> implements NFAFactory <T> { private static final long serialVersionUID = 8939783698296714379L ; private final long windowTime; private final Collection < State <T>> states; private final boolean timeoutHandling; private NFAFactoryImpl ( long windowTime, Collection < State <T>> states, boolean timeoutHandling) { this .windowTime = windowTime; this .states = states; this .timeoutHandling = timeoutHandling; } @Override public NFA<T> createNFA() { // 一個(gè)NFA由狀態(tài)集合、時(shí)間窗口的長(zhǎng)度和是否處理超時(shí)組成 return new NFA<>(states, windowTime, timeoutHandling); } } }
NFA:Non-deterministic finite automaton - 非確定的有限(狀態(tài))自動(dòng)機(jī)。
更多內(nèi)容參見(jiàn)
https://zh.wikipedia.org/wiki/非確定有限狀態(tài)自動(dòng)機(jī)
public class NFA<T> { /** * NFACompiler返回的所有有效的NFA狀態(tài)集合 * These are directly derived from the user-specified pattern. */ private final Map < String , State <T>> states; /** * Pattern.within(Time)指定的時(shí)間窗口長(zhǎng)度 */ private final long windowTime; /** * 一個(gè)超時(shí)匹配的標(biāo)記 */ private final boolean handleTimeout; ... }
PatternSelectFunction和PatternFlatSelectFunction
當(dāng)一個(gè)包含被匹配到的事件的映射能夠通過(guò)模式名稱(chēng)訪問(wèn)到的時(shí)候,PatternSelectFunction的select()方法會(huì)被調(diào)用。模式名稱(chēng)是由Pattern定義的時(shí)候指定的。select()方法恰好返回一個(gè)結(jié)果,如果需要返回多個(gè)結(jié)果,則可以實(shí)現(xiàn)PatternFlatSelectFunction。
public interface PatternSelectFunction <IN, OUT> extends Function , Serializable { /** * 從給到的事件映射中生成一個(gè)結(jié)果。這些事件使用他們關(guān)聯(lián)的模式名稱(chēng)作為唯一標(biāo)識(shí) */ OUT select( Map < String , List <IN>> pattern) throws Exception ; }
PatternFlatSelectFunction,不是返回一個(gè)OUT,而是使用Collector 把匹配到的事件收集起來(lái)。
public interface PatternFlatSelectFunction <IN, OUT> extends Function , Serializable { /** * 生成一個(gè)或多個(gè)結(jié)果 */ void flatSelect( Map < String , List <IN>> pattern, Collector <OUT> out) throws Exception ; }
SelectTimeoutCepOperator、PatternTimeoutFunction
SelectTimeoutCepOperator是在CEPOperatorUtils中調(diào)用createTimeoutPatternStream()方法時(shí)創(chuàng)建出來(lái)。
SelectTimeoutCepOperator中會(huì)被算子迭代調(diào)用的方法是processMatchedSequences()和processTimedOutSequences()。
模板方法...對(duì)應(yīng)到抽象類(lèi)AbstractKeyedCEPPatternOperator中processEvent()方法和advanceTime()方法。
還有FlatSelectTimeoutCepOperator和對(duì)應(yīng)的PatternFlatTimeoutFunction。
public class SelectTimeoutCepOperator <IN, OUT1, OUT2, KEY> extends AbstractKeyedCEPPatternOperator <IN, KEY, OUT1, SelectTimeoutCepOperator . SelectWrapper <IN, OUT1, OUT2>> { private OutputTag <OUT2> timedOutOutputTag; public SelectTimeoutCepOperator ( TypeSerializer <IN> inputSerializer, boolean isProcessingTime, NFACompiler . NFAFactory <IN> nfaFactory, final EventComparator <IN> comparator, AfterMatchSkipStrategy skipStrategy, // 參數(shù)命名混淆了flat...包括SelectWrapper類(lèi)中的成員命名... PatternSelectFunction <IN, OUT1> flatSelectFunction, PatternTimeoutFunction <IN, OUT2> flatTimeoutFunction, OutputTag <OUT2> outputTag, OutputTag <IN> lateDataOutputTag) { super ( inputSerializer, isProcessingTime, nfaFactory, comparator, skipStrategy, new SelectWrapper <>(flatSelectFunction, flatTimeoutFunction), lateDataOutputTag); this .timedOutOutputTag = outputTag; } ... } public interface PatternTimeoutFunction <IN, OUT> extends Function , Serializable { OUT timeout( Map < String , List <IN>> pattern, long timeoutTimestamp) throws Exception ; } public interface PatternFlatTimeoutFunction <IN, OUT> extends Function , Serializable { void timeout( Map < String , List <IN>> pattern, long timeoutTimestamp, Collector <OUT> out) throws Exception ; }
CEP和CEPOperatorUtils
CEP是創(chuàng)建PatternStream的工具類(lèi),PatternStream只是DataStream和Pattern的組合。
public class CEP { public static <T> PatternStream <T> pattern( DataStream <T> input, Pattern <T, ?> pattern) { return new PatternStream <>(input, pattern); } public static <T> PatternStream <T> pattern( DataStream <T> input, Pattern <T, ?> pattern, EventComparator <T> comparator) { return new PatternStream <>(input, pattern, comparator); } }
CEPOperatorUtils是在PatternStream的select()方法和flatSelect()方法被調(diào)用的時(shí)候,去創(chuàng)建SingleOutputStreamOperator(DataStream)。
public class CEPOperatorUtils { ... private static <IN, OUT, K> SingleOutputStreamOperator <OUT> createPatternStream( final DataStream <IN> inputStream, final Pattern <IN, ?> pattern, final TypeInformation <OUT> outTypeInfo, final boolean timeoutHandling, final EventComparator <IN> comparator, final OperatorBuilder <IN, OUT> operatorBuilder) { final TypeSerializer <IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig()); // check whether we use processing time final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic . ProcessingTime ; // compile our pattern into a NFAFactory to instantiate NFAs later on final NFACompiler . NFAFactory <IN> nfaFactory = NFACompiler .compileFactory(pattern, timeoutHandling); final SingleOutputStreamOperator <OUT> patternStream; if (inputStream instanceof KeyedStream ) { KeyedStream <IN, K> keyedStream = ( KeyedStream <IN, K>) inputStream; patternStream = keyedStream.transform( operatorBuilder.getKeyedOperatorName(), outTypeInfo, operatorBuilder.build( inputSerializer, isProcessingTime, nfaFactory, comparator, pattern.getAfterMatchSkipStrategy())); } else { KeySelector <IN, Byte > keySelector = new NullByteKeySelector <>(); patternStream = inputStream.keyBy(keySelector).transform( operatorBuilder.getOperatorName(), outTypeInfo, operatorBuilder.build( inputSerializer, isProcessingTime, nfaFactory, comparator, pattern.getAfterMatchSkipStrategy() )).forceNonParallel(); } return patternStream; } ... }
FlinkCEP實(shí)現(xiàn)步驟
- IN: DataSource -> DataStream -> Transformations -> DataStream
- Pattern: Pattern.begin.where.next.where...times...
- PatternStream: CEP.pattern(DataStream, Pattern)
- DataStream: PatternStream.select(PatternSelectFunction) PatternStream.flatSelect(PatternSelectFunction)
- OUT: DataStream -> Transformations -> DataStream -> DataSink
FlinkCEP匹配超時(shí)實(shí)現(xiàn)步驟
TimeoutCEP的流需要keyBy,即KeyedStream,如果inputStream不是KeyedStream,會(huì)new一個(gè)0字節(jié)的Key(上面CEPOperatorUtils源碼里有提到)。
KeySelector <IN, Byte > keySelector = new NullByteKeySelector <>();
Pattern最后調(diào)用within設(shè)置窗口時(shí)間。 如果是對(duì)主鍵進(jìn)行分組,一個(gè)時(shí)間窗口內(nèi)最多只會(huì)匹配出一個(gè)超時(shí)事件,使用PatternStream.select(...)就可以了。
- IN: DataSource -> DataStream -> Transformations -> DataStream -> keyBy -> KeyedStream
- Pattern: Pattern.begin.where.next.where...within(Time windowTime)
- PatternStream: CEP.pattern(KeyedStream, Pattern)
- OutputTag: new OutputTag(...)
- SingleOutputStreamOperator: PatternStream.flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)
- DataStream: SingleOutputStreamOperator.getSideOutput(OutputTag)
- OUT: DataStream -> Transformations -> DataStream -> DataSink
FlinkCEP超時(shí)不足
和Flink窗口聚合類(lèi)似,如果使用事件時(shí)間和依賴事件生成的水印向前推進(jìn),需要后續(xù)的事件到達(dá),才會(huì)觸發(fā)窗口進(jìn)行計(jì)算和輸出結(jié)果。
FlinkCEP超時(shí)完整demo
public class CEPTimeoutEventJob { private static final String LOCAL_KAFKA_BROKER = "localhost:9092" ; private static final String GROUP_ID = CEPTimeoutEventJob . class .getSimpleName(); private static final String GROUP_TOPIC = GROUP_ID; public static void main( String [] args) throws Exception { // 參數(shù) ParameterTool params = ParameterTool .fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); // 使用事件時(shí)間 env.setStreamTimeCharacteristic( TimeCharacteristic . EventTime ); env.enableCheckpointing( 5000 ); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig . ExternalizedCheckpointCleanup .RETAIN_ON_CANCELLATION); env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy( RestartStrategies .fixedDelayRestart( 5 , 10000 )); // 不使用POJO的時(shí)間 final AssignerWithPeriodicWatermarks extractor = new IngestionTimeExtractor <POJO>(); // 與Kafka Topic的Partition保持一致 env.setParallelism( 3 ); Properties kafkaProps = new Properties (); kafkaProps.setProperty( "bootstrap.servers" , LOCAL_KAFKA_BROKER); kafkaProps.setProperty( "group.id" , GROUP_ID); // 接入Kafka的消息 FlinkKafkaConsumer011 <POJO> consumer = new FlinkKafkaConsumer011 <>(GROUP_TOPIC, new POJOSchema (), kafkaProps); DataStream <POJO> pojoDataStream = env.addSource(consumer) .assignTimestampsAndWatermarks(extractor); pojoDataStream.print(); // 根據(jù)主鍵aid分組 即對(duì)每一個(gè)POJO事件進(jìn)行匹配檢測(cè)【不同類(lèi)型的POJO,可以采用不同的within時(shí)間】 // 1. DataStream <POJO> keyedPojos = pojoDataStream .keyBy( "aid" ); // 從初始化到終態(tài)-一個(gè)完整的POJO事件序列 // 2. Pattern <POJO, POJO> completedPojo = Pattern .<POJO>begin( "init" ) .where( new SimpleCondition <POJO>() { private static final long serialVersionUID = - 6847788055093903603L ; @Override public boolean filter(POJO pojo) throws Exception { return "02" .equals(pojo.getAstatus()); } }) .followedBy( "end" ) // .next("end") .where( new SimpleCondition <POJO>() { private static final long serialVersionUID = - 2655089736460847552L ; @Override public boolean filter(POJO pojo) throws Exception { return "00" .equals(pojo.getAstatus()) || "01" .equals(pojo.getAstatus()); } }); // 找出1分鐘內(nèi)【便于測(cè)試】都沒(méi)有到終態(tài)的事件aid // 如果針對(duì)不同類(lèi)型有不同within時(shí)間,比如有的是超時(shí)1分鐘,有的可能是超時(shí)1個(gè)小時(shí) 則生成多個(gè)PatternStream // 3. PatternStream <POJO> patternStream = CEP.pattern(keyedPojos, completedPojo.within( Time .minutes( 1 ))); // 定義側(cè)面輸出timedout // 4. OutputTag <POJO> timedout = new OutputTag <POJO>( "timedout" ) { private static final long serialVersionUID = 773503794597666247L ; }; // OutputTag<L> timeoutOutputTag, PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction, PatternFlatSelectFunction<T, R> patternFlatSelectFunction // 5. SingleOutputStreamOperator <POJO> timeoutPojos = patternStream.flatSelect( timedout, new POJOTimedOut (), new FlatSelectNothing () ); // 打印輸出超時(shí)的POJO // 6.7. timeoutPojos.getSideOutput(timedout).print(); timeoutPojos.print(); env.execute( CEPTimeoutEventJob . class .getSimpleName()); } /** * 把超時(shí)的事件收集起來(lái) */ public static class POJOTimedOut implements PatternFlatTimeoutFunction <POJO, POJO> { private static final long serialVersionUID = - 4214641891396057732L ; @Override public void timeout( Map < String , List <POJO>> map, long l, Collector <POJO> collector) throws Exception { if ( null != map.get( "init" )) { for (POJO pojoInit : map.get( "init" )) { System .out.println( "timeout init:" + pojoInit.getAid()); collector.collect(pojoInit); } } // 因?yàn)閑nd超時(shí)了,還沒(méi)收到end,所以這里是拿不到end的 System .out.println( "timeout end: " + map.get( "end" )); } } /** * 通常什么都不做,但也可以把所有匹配到的事件發(fā)往下游;如果是寬松臨近,被忽略或穿透的事件就沒(méi)辦法選中發(fā)往下游了 * 一分鐘時(shí)間內(nèi)走完init和end的數(shù)據(jù) * * @param <T> */ public static class FlatSelectNothing <T> implements PatternFlatSelectFunction <T, T> { private static final long serialVersionUID = - 3029589950677623844L ; @Override public void flatSelect( Map < String , List <T>> pattern, Collector <T> collector) { System .out.println( "flatSelect: " + pattern); } } }
測(cè)試結(jié)果(followedBy):
3 > POJO{aid= 'ID000-0' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419728242 , energy= 529.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-1' , astyle= 'STYLE000-2' , aname= 'NAME-1' , logTime= 1563419728783 , energy= 348.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-0' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419749259 , energy= 492.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '00' , createTime= null , updateTime= null } flatSelect: {init=[POJO{aid= 'ID000-0' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419728242 , energy= 529.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null }], end =[POJO{aid= 'ID000-0' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419749259 , energy= 492.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '00' , createTime= null , updateTime= null }]} timeout init:ID000- 1 3 > POJO{aid= 'ID000-1' , astyle= 'STYLE000-2' , aname= 'NAME-1' , logTime= 1563419728783 , energy= 348.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } timeout end : null 3 > POJO{aid= 'ID000-2' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419829639 , energy= 467.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '03' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-2' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419841394 , energy= 107.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '00' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-3' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419967721 , energy= 431.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-3' , astyle= 'STYLE000-2' , aname= 'NAME-0' , logTime= 1563419979567 , energy= 32.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '03' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-3' , astyle= 'STYLE000-2' , aname= 'NAME-0' , logTime= 1563419993612 , energy= 542.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '01' , createTime= null , updateTime= null } flatSelect: {init=[POJO{aid= 'ID000-3' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419967721 , energy= 431.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null }], end =[POJO{aid= 'ID000-3' , astyle= 'STYLE000-2' , aname= 'NAME-0' , logTime= 1563419993612 , energy= 542.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '01' , createTime= null , updateTime= null }]} 3 > POJO{aid= 'ID000-4' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563420063760 , energy= 122.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-4' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563420078008 , energy= 275.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '03' , createTime= null , updateTime= null } timeout init:ID000- 4 3 > POJO{aid= 'ID000-4' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563420063760 , energy= 122.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } timeout end : null
總結(jié)
以上所述是小編給大家介紹的Apache FlinkCEP 實(shí)現(xiàn)超時(shí)狀態(tài)監(jiān)控的步驟,希望對(duì)大家有所幫助,如果大家有任何疑問(wèn)歡迎給我留言,小編會(huì)及時(shí)回復(fù)大家的!
相關(guān)文章
Ubuntu12.04建立內(nèi)核樹(shù)實(shí)現(xiàn)過(guò)程詳解
這篇文章主要介紹了Ubuntu12.04建立內(nèi)核樹(shù)實(shí)現(xiàn)過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09CentOS 7.2.1511 編譯安裝Nginx1.10.1+MySQL5.6.33+PHP5.6.26運(yùn)行環(huán)境
這篇文章主要介紹了CentOS 7.2.1511 編譯安裝Nginx1.10.1+MySQL5.6.33+PHP5.6.26運(yùn)行環(huán)境,需要的朋友可以參考下2016-10-10Ubuntu下VIM配置成C++開(kāi)發(fā)編輯器
今天小編就為大家分享一篇關(guān)于Ubuntu下VIM配置成C++開(kāi)發(fā)編輯器,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2018-10-10Linux靜態(tài)鏈接庫(kù)與模板類(lèi)的處理方式
這篇文章主要介紹了Linux下編譯使用靜態(tài)鏈接庫(kù)遇到模板類(lèi)的時(shí)該如何處理。2017-11-11LINUX磁盤(pán)分區(qū)、格式化、掛載、卸載詳細(xì)過(guò)程
這篇文章主要介紹了LINUX磁盤(pán)分區(qū)、格式化、掛載、卸載詳細(xì)過(guò)程,具有一定的參考價(jià)值,有需要的可以了解一下。2016-11-11linux中通過(guò)文件描述符獲取文件絕對(duì)路徑的方法
下面小編就為大家?guī)?lái)一篇linux中通過(guò)文件描述符獲取文件絕對(duì)路徑的方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-12-12Linux文件服務(wù)器實(shí)戰(zhàn)詳解(匿名用戶)
這篇文章主要介紹了Linux文件服務(wù)器實(shí)戰(zhàn)(匿名用戶),非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2018-06-06