Flink實(shí)現(xiàn)特定統(tǒng)計(jì)的歸約聚合reduce操作
如果說(shuō)簡(jiǎn)單聚合是對(duì)一些特定統(tǒng)計(jì)需求的實(shí)現(xiàn),那么 reduce 算子就是一個(gè)一般化的聚合統(tǒng)計(jì)操作了。從大名鼎鼎的 MapReduce 開(kāi)始,我們對(duì) reduce 操作就不陌生:它可以對(duì)已有的
數(shù)據(jù)進(jìn)行歸約處理,把每一個(gè)新輸入的數(shù)據(jù)和當(dāng)前已經(jīng)歸約出來(lái)的值,再做一個(gè)聚合計(jì)算。與簡(jiǎn)單聚合類(lèi)似,reduce 操作也會(huì)將 KeyedStream 轉(zhuǎn)換為 DataStream。它不會(huì)改變流的元
素?cái)?shù)據(jù)類(lèi)型,所以輸出類(lèi)型和輸入類(lèi)型是一樣的。調(diào)用 KeyedStream 的 reduce 方法時(shí),需要傳入一個(gè)參數(shù),實(shí)現(xiàn) ReduceFunction 接口。接口在源碼中的定義如下:
@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {
/**
* The core method of ReduceFunction, combining two values into one value of the same type. The
* reduce function is consecutively applied to all values of a group until only a single value
* remains.
*
* @param value1 The first value to combine.
* @param value2 The second value to combine.
* @return The combined value of both input values.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
T reduce(T value1, T value2) throws Exception;
}
ReduceFunction 接口里需要實(shí)現(xiàn) reduce()方法,這個(gè)方法接收兩個(gè)輸入事件,經(jīng)過(guò)轉(zhuǎn)換處理之后輸出一個(gè)相同類(lèi)型的事件;所以,對(duì)于一組數(shù)據(jù),我們可以先取兩個(gè)進(jìn)行合并,然后再
將合并的結(jié)果看作一個(gè)數(shù)據(jù)、再跟后面的數(shù)據(jù)合并,最終會(huì)將它“簡(jiǎn)化”成唯一的一個(gè)數(shù)據(jù),這也就是 reduce“歸約”的含義。在流處理的底層實(shí)現(xiàn)過(guò)程中,實(shí)際上是將中間“合并的結(jié)果”
作為任務(wù)的一個(gè)狀態(tài)保存起來(lái)的;之后每來(lái)一個(gè)新的數(shù)據(jù),就和之前的聚合狀態(tài)進(jìn)一步做歸約。
其實(shí),reduce 的語(yǔ)義是針對(duì)列表進(jìn)行規(guī)約操作,運(yùn)算規(guī)則由 ReduceFunction 中的 reduce方法來(lái)定義,而在 ReduceFunction 內(nèi)部會(huì)維護(hù)一個(gè)初始值為空的累加器,注意累加器的類(lèi)型
和輸入元素的類(lèi)型相同,當(dāng)?shù)谝粭l元素到來(lái)時(shí),累加器的值更新為第一條元素的值,當(dāng)新的元素到來(lái)時(shí),新元素會(huì)和累加器進(jìn)行累加操作,這里的累加操作就是 reduce 函數(shù)定義的運(yùn)算規(guī)
則。然后將更新以后的累加器的值向下游輸出。
我們可以單獨(dú)定義一個(gè)函數(shù)類(lèi)實(shí)現(xiàn) ReduceFunction 接口,也可以直接傳入一個(gè)匿名類(lèi)。當(dāng)然,同樣也可以通過(guò)傳入 Lambda 表達(dá)式實(shí)現(xiàn)類(lèi)似的功能。與簡(jiǎn)單聚合類(lèi)似,reduce 操作也會(huì)將 KeyedStream 轉(zhuǎn)換為 DataStrema。它不會(huì)改變流的元素?cái)?shù)據(jù)類(lèi)型,所以輸出類(lèi)型和輸入類(lèi)型是一樣的。下面我們來(lái)看一個(gè)稍復(fù)雜的例子。
我們將數(shù)據(jù)流按照用戶(hù) id 進(jìn)行分區(qū),然后用一個(gè) reduce 算子實(shí)現(xiàn) sum 的功能,統(tǒng)計(jì)每個(gè)用戶(hù)訪問(wèn)的頻次;進(jìn)而將所有統(tǒng)計(jì)結(jié)果分到一組,用另一個(gè) reduce 算子實(shí)現(xiàn) maxBy 的功能,記錄所有用戶(hù)中訪問(wèn)頻次最高的那個(gè),也就是當(dāng)前訪問(wèn)量最大的用戶(hù)是誰(shuí)。
package com.rosh.flink.test;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
/**
* 我們將數(shù)據(jù)流按照用戶(hù) id 進(jìn)行分區(qū),然后用一個(gè) reduce 算子實(shí)現(xiàn) sum 的功能,統(tǒng)計(jì)每個(gè)
* 用戶(hù)訪問(wèn)的頻次;進(jìn)而將所有統(tǒng)計(jì)結(jié)果分到一組,用另一個(gè) reduce 算子實(shí)現(xiàn) maxBy 的功能,
* 記錄所有用戶(hù)中訪問(wèn)頻次最高的那個(gè),也就是當(dāng)前訪問(wèn)量最大的用戶(hù)是誰(shuí)。
*/
public class TransReduceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//隨機(jī)生成數(shù)據(jù)
Random random = new Random();
List<Integer> userIds = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
userIds.add(random.nextInt(5));
}
DataStreamSource<Integer> userIdDS = env.fromCollection(userIds);
//每個(gè)ID訪問(wèn)記錄一次
SingleOutputStreamOperator<Tuple2<Integer, Long>> mapDS = userIdDS.map(new MapFunction<Integer, Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> map(Integer value) throws Exception {
return new Tuple2<>(value, 1L);
}
});
//統(tǒng)計(jì)每個(gè)user訪問(wèn)多少次
SingleOutputStreamOperator<Tuple2<Integer, Long>> sumDS = mapDS.keyBy(tuple -> tuple.f0).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
});
sumDS.print("sumDS ->>>>>>>>>>>>>");
//把所有分區(qū)合并,求出最大的訪問(wèn)量
SingleOutputStreamOperator<Tuple2<Integer, Long>> maxDS = sumDS.keyBy(key -> true).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
if (value1.f1 > value2.f1) {
return value1;
} else {
return value2;
}
}
});
maxDS.print("maxDS ->>>>>>>>>>>");
env.execute("TransReduceTest");
}
}

到此這篇關(guān)于Flink實(shí)現(xiàn)特定統(tǒng)計(jì)的歸約聚合reduce操作的文章就介紹到這了,更多相關(guān)Flink歸約聚合內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java Disruptor構(gòu)建高性能內(nèi)存隊(duì)列使用詳解
這篇文章主要為大家介紹了java Disruptor構(gòu)建高性能內(nèi)存隊(duì)列使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12
java實(shí)現(xiàn)網(wǎng)站微信掃碼支付
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)網(wǎng)站微信掃碼支付,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-07-07
SpringBoot中使用@ControllerAdvice注解詳解
這篇文章主要介紹了SpringBoot中使用@ControllerAdvice注解詳解,@ControllerAdvice,是Spring3.2提供的新注解,它是一個(gè)Controller增強(qiáng)器,可對(duì)controller中被 @RequestMapping注解的方法加一些邏輯處理,需要的朋友可以參考下2023-10-10
JAVA實(shí)現(xiàn)的CrazyArcade泡泡堂游戲
CrazyArcade泡泡堂游戲,一款用Java編寫(xiě)的JavaSwing游戲程序。 使用了MVC模式,分離了模型、視圖和控制器,使得項(xiàng)目結(jié)構(gòu)清晰易于擴(kuò)展,使用配置文件來(lái)設(shè)置游戲基本配置,擴(kuò)展地圖人物道具等。同時(shí),該程序編寫(xiě)期間用了單例模式、工廠模式、模板模式等設(shè)計(jì)模式。2021-04-04
Java前后端的JSON傳輸方式(前后端JSON格式轉(zhuǎn)換)
這篇文章主要介紹了Java前后端的JSON傳輸方式(前后端JSON格式轉(zhuǎn)換),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-04-04
springboot連接多個(gè)數(shù)據(jù)庫(kù)的實(shí)現(xiàn)方法
有時(shí)候一個(gè)SpringBoot項(xiàng)目需要同時(shí)連接兩個(gè)數(shù)據(jù)庫(kù),本文就來(lái)介紹一下springboot連接多個(gè)數(shù)據(jù)庫(kù)的實(shí)現(xiàn)方法,具有一定的參考價(jià)值,感興趣的可以了解一下2024-08-08
SpringMVC+Mybatis二維碼實(shí)現(xiàn)多平臺(tái)付款(附源碼)
本文主要實(shí)現(xiàn)微信支付寶等支付平臺(tái)合多為一的二維碼支付,并且實(shí)現(xiàn)有效時(shí)間內(nèi)支付有效,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08
Pulsar源碼徹底解決重復(fù)消費(fèi)問(wèn)題
這篇文章主要為大家介紹了Pulsar源碼徹底解決重復(fù)消費(fèi)問(wèn)題,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05

