Flink實現(xiàn)特定統(tǒng)計的歸約聚合reduce操作
如果說簡單聚合是對一些特定統(tǒng)計需求的實現(xiàn),那么 reduce 算子就是一個一般化的聚合統(tǒng)計操作了。從大名鼎鼎的 MapReduce 開始,我們對 reduce 操作就不陌生:它可以對已有的
數(shù)據(jù)進(jìn)行歸約處理,把每一個新輸入的數(shù)據(jù)和當(dāng)前已經(jīng)歸約出來的值,再做一個聚合計算。與簡單聚合類似,reduce 操作也會將 KeyedStream 轉(zhuǎn)換為 DataStream。它不會改變流的元
素數(shù)據(jù)類型,所以輸出類型和輸入類型是一樣的。調(diào)用 KeyedStream 的 reduce 方法時,需要傳入一個參數(shù),實現(xiàn) ReduceFunction 接口。接口在源碼中的定義如下:
@Public @FunctionalInterface public interface ReduceFunction<T> extends Function, Serializable { /** * The core method of ReduceFunction, combining two values into one value of the same type. The * reduce function is consecutively applied to all values of a group until only a single value * remains. * * @param value1 The first value to combine. * @param value2 The second value to combine. * @return The combined value of both input values. * @throws Exception This method may throw exceptions. Throwing an exception will cause the * operation to fail and may trigger recovery. */ T reduce(T value1, T value2) throws Exception; }
ReduceFunction 接口里需要實現(xiàn) reduce()方法,這個方法接收兩個輸入事件,經(jīng)過轉(zhuǎn)換處理之后輸出一個相同類型的事件;所以,對于一組數(shù)據(jù),我們可以先取兩個進(jìn)行合并,然后再
將合并的結(jié)果看作一個數(shù)據(jù)、再跟后面的數(shù)據(jù)合并,最終會將它“簡化”成唯一的一個數(shù)據(jù),這也就是 reduce“歸約”的含義。在流處理的底層實現(xiàn)過程中,實際上是將中間“合并的結(jié)果”
作為任務(wù)的一個狀態(tài)保存起來的;之后每來一個新的數(shù)據(jù),就和之前的聚合狀態(tài)進(jìn)一步做歸約。
其實,reduce 的語義是針對列表進(jìn)行規(guī)約操作,運算規(guī)則由 ReduceFunction 中的 reduce方法來定義,而在 ReduceFunction 內(nèi)部會維護(hù)一個初始值為空的累加器,注意累加器的類型
和輸入元素的類型相同,當(dāng)?shù)谝粭l元素到來時,累加器的值更新為第一條元素的值,當(dāng)新的元素到來時,新元素會和累加器進(jìn)行累加操作,這里的累加操作就是 reduce 函數(shù)定義的運算規(guī)
則。然后將更新以后的累加器的值向下游輸出。
我們可以單獨定義一個函數(shù)類實現(xiàn) ReduceFunction 接口,也可以直接傳入一個匿名類。當(dāng)然,同樣也可以通過傳入 Lambda 表達(dá)式實現(xiàn)類似的功能。與簡單聚合類似,reduce 操作也會將 KeyedStream 轉(zhuǎn)換為 DataStrema。它不會改變流的元素數(shù)據(jù)類型,所以輸出類型和輸入類型是一樣的。下面我們來看一個稍復(fù)雜的例子。
我們將數(shù)據(jù)流按照用戶 id 進(jìn)行分區(qū),然后用一個 reduce 算子實現(xiàn) sum 的功能,統(tǒng)計每個用戶訪問的頻次;進(jìn)而將所有統(tǒng)計結(jié)果分到一組,用另一個 reduce 算子實現(xiàn) maxBy 的功能,記錄所有用戶中訪問頻次最高的那個,也就是當(dāng)前訪問量最大的用戶是誰。
package com.rosh.flink.test; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; import java.util.List; import java.util.Random; /** * 我們將數(shù)據(jù)流按照用戶 id 進(jìn)行分區(qū),然后用一個 reduce 算子實現(xiàn) sum 的功能,統(tǒng)計每個 * 用戶訪問的頻次;進(jìn)而將所有統(tǒng)計結(jié)果分到一組,用另一個 reduce 算子實現(xiàn) maxBy 的功能, * 記錄所有用戶中訪問頻次最高的那個,也就是當(dāng)前訪問量最大的用戶是誰。 */ public class TransReduceTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //隨機(jī)生成數(shù)據(jù) Random random = new Random(); List<Integer> userIds = new ArrayList<>(); for (int i = 1; i <= 10; i++) { userIds.add(random.nextInt(5)); } DataStreamSource<Integer> userIdDS = env.fromCollection(userIds); //每個ID訪問記錄一次 SingleOutputStreamOperator<Tuple2<Integer, Long>> mapDS = userIdDS.map(new MapFunction<Integer, Tuple2<Integer, Long>>() { @Override public Tuple2<Integer, Long> map(Integer value) throws Exception { return new Tuple2<>(value, 1L); } }); //統(tǒng)計每個user訪問多少次 SingleOutputStreamOperator<Tuple2<Integer, Long>> sumDS = mapDS.keyBy(tuple -> tuple.f0).reduce(new ReduceFunction<Tuple2<Integer, Long>>() { @Override public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception { return new Tuple2<>(value1.f0, value1.f1 + value2.f1); } }); sumDS.print("sumDS ->>>>>>>>>>>>>"); //把所有分區(qū)合并,求出最大的訪問量 SingleOutputStreamOperator<Tuple2<Integer, Long>> maxDS = sumDS.keyBy(key -> true).reduce(new ReduceFunction<Tuple2<Integer, Long>>() { @Override public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception { if (value1.f1 > value2.f1) { return value1; } else { return value2; } } }); maxDS.print("maxDS ->>>>>>>>>>>"); env.execute("TransReduceTest"); } }
到此這篇關(guān)于Flink實現(xiàn)特定統(tǒng)計的歸約聚合reduce操作的文章就介紹到這了,更多相關(guān)Flink歸約聚合內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java Disruptor構(gòu)建高性能內(nèi)存隊列使用詳解
這篇文章主要為大家介紹了java Disruptor構(gòu)建高性能內(nèi)存隊列使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12SpringBoot中使用@ControllerAdvice注解詳解
這篇文章主要介紹了SpringBoot中使用@ControllerAdvice注解詳解,@ControllerAdvice,是Spring3.2提供的新注解,它是一個Controller增強(qiáng)器,可對controller中被 @RequestMapping注解的方法加一些邏輯處理,需要的朋友可以參考下2023-10-10Java前后端的JSON傳輸方式(前后端JSON格式轉(zhuǎn)換)
這篇文章主要介紹了Java前后端的JSON傳輸方式(前后端JSON格式轉(zhuǎn)換),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-04-04springboot連接多個數(shù)據(jù)庫的實現(xiàn)方法
有時候一個SpringBoot項目需要同時連接兩個數(shù)據(jù)庫,本文就來介紹一下springboot連接多個數(shù)據(jù)庫的實現(xiàn)方法,具有一定的參考價值,感興趣的可以了解一下2024-08-08SpringMVC+Mybatis二維碼實現(xiàn)多平臺付款(附源碼)
本文主要實現(xiàn)微信支付寶等支付平臺合多為一的二維碼支付,并且實現(xiàn)有效時間內(nèi)支付有效,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-08-08