欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Flink實現(xiàn)特定統(tǒng)計的歸約聚合reduce操作

 更新時間:2023年02月08日 11:55:25   作者:響徹天堂丶  
這篇文章主要介紹了Flink實現(xiàn)特定統(tǒng)計的歸約聚合reduce操作,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧

如果說簡單聚合是對一些特定統(tǒng)計需求的實現(xiàn),那么 reduce 算子就是一個一般化的聚合統(tǒng)計操作了。從大名鼎鼎的 MapReduce 開始,我們對 reduce 操作就不陌生:它可以對已有的

數(shù)據(jù)進(jìn)行歸約處理,把每一個新輸入的數(shù)據(jù)和當(dāng)前已經(jīng)歸約出來的值,再做一個聚合計算。與簡單聚合類似,reduce 操作也會將 KeyedStream 轉(zhuǎn)換為 DataStream。它不會改變流的元

素數(shù)據(jù)類型,所以輸出類型和輸入類型是一樣的。調(diào)用 KeyedStream 的 reduce 方法時,需要傳入一個參數(shù),實現(xiàn) ReduceFunction 接口。接口在源碼中的定義如下:

@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {
    /**
     * The core method of ReduceFunction, combining two values into one value of the same type. The
     * reduce function is consecutively applied to all values of a group until only a single value
     * remains.
     *
     * @param value1 The first value to combine.
     * @param value2 The second value to combine.
     * @return The combined value of both input values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    T reduce(T value1, T value2) throws Exception;
}

ReduceFunction 接口里需要實現(xiàn) reduce()方法,這個方法接收兩個輸入事件,經(jīng)過轉(zhuǎn)換處理之后輸出一個相同類型的事件;所以,對于一組數(shù)據(jù),我們可以先取兩個進(jìn)行合并,然后再

將合并的結(jié)果看作一個數(shù)據(jù)、再跟后面的數(shù)據(jù)合并,最終會將它“簡化”成唯一的一個數(shù)據(jù),這也就是 reduce“歸約”的含義。在流處理的底層實現(xiàn)過程中,實際上是將中間“合并的結(jié)果”

作為任務(wù)的一個狀態(tài)保存起來的;之后每來一個新的數(shù)據(jù),就和之前的聚合狀態(tài)進(jìn)一步做歸約。

其實,reduce 的語義是針對列表進(jìn)行規(guī)約操作,運算規(guī)則由 ReduceFunction 中的 reduce方法來定義,而在 ReduceFunction 內(nèi)部會維護(hù)一個初始值為空的累加器,注意累加器的類型

和輸入元素的類型相同,當(dāng)?shù)谝粭l元素到來時,累加器的值更新為第一條元素的值,當(dāng)新的元素到來時,新元素會和累加器進(jìn)行累加操作,這里的累加操作就是 reduce 函數(shù)定義的運算規(guī)

則。然后將更新以后的累加器的值向下游輸出。

我們可以單獨定義一個函數(shù)類實現(xiàn) ReduceFunction 接口,也可以直接傳入一個匿名類。當(dāng)然,同樣也可以通過傳入 Lambda 表達(dá)式實現(xiàn)類似的功能。與簡單聚合類似,reduce 操作也會將 KeyedStream 轉(zhuǎn)換為 DataStrema。它不會改變流的元素數(shù)據(jù)類型,所以輸出類型和輸入類型是一樣的。下面我們來看一個稍復(fù)雜的例子。

我們將數(shù)據(jù)流按照用戶 id 進(jìn)行分區(qū),然后用一個 reduce 算子實現(xiàn) sum 的功能,統(tǒng)計每個用戶訪問的頻次;進(jìn)而將所有統(tǒng)計結(jié)果分到一組,用另一個 reduce 算子實現(xiàn) maxBy 的功能,記錄所有用戶中訪問頻次最高的那個,也就是當(dāng)前訪問量最大的用戶是誰。

package com.rosh.flink.test;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
/**
 * 我們將數(shù)據(jù)流按照用戶 id 進(jìn)行分區(qū),然后用一個 reduce 算子實現(xiàn) sum 的功能,統(tǒng)計每個
 * 用戶訪問的頻次;進(jìn)而將所有統(tǒng)計結(jié)果分到一組,用另一個 reduce 算子實現(xiàn) maxBy 的功能,
 * 記錄所有用戶中訪問頻次最高的那個,也就是當(dāng)前訪問量最大的用戶是誰。
 */
public class TransReduceTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //隨機(jī)生成數(shù)據(jù)
        Random random = new Random();
        List<Integer> userIds = new ArrayList<>();
        for (int i = 1; i <= 10; i++) {
            userIds.add(random.nextInt(5));
        }
        DataStreamSource<Integer> userIdDS = env.fromCollection(userIds);
        //每個ID訪問記錄一次
        SingleOutputStreamOperator<Tuple2<Integer, Long>> mapDS = userIdDS.map(new MapFunction<Integer, Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> map(Integer value) throws Exception {
                return new Tuple2<>(value, 1L);
            }
        });
        //統(tǒng)計每個user訪問多少次
        SingleOutputStreamOperator<Tuple2<Integer, Long>> sumDS = mapDS.keyBy(tuple -> tuple.f0).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
                return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
            }
        });
        sumDS.print("sumDS  ->>>>>>>>>>>>>");
        //把所有分區(qū)合并,求出最大的訪問量
        SingleOutputStreamOperator<Tuple2<Integer, Long>> maxDS = sumDS.keyBy(key -> true).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
                if (value1.f1 > value2.f1) {
                    return value1;
                } else {
                    return value2;
                }
            }
        });
        maxDS.print("maxDS ->>>>>>>>>>>");
        env.execute("TransReduceTest");
    }
}

到此這篇關(guān)于Flink實現(xiàn)特定統(tǒng)計的歸約聚合reduce操作的文章就介紹到這了,更多相關(guān)Flink歸約聚合內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java Disruptor構(gòu)建高性能內(nèi)存隊列使用詳解

    java Disruptor構(gòu)建高性能內(nèi)存隊列使用詳解

    這篇文章主要為大家介紹了java Disruptor構(gòu)建高性能內(nèi)存隊列使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-12-12
  • java實現(xiàn)網(wǎng)站微信掃碼支付

    java實現(xiàn)網(wǎng)站微信掃碼支付

    這篇文章主要為大家詳細(xì)介紹了java實現(xiàn)網(wǎng)站微信掃碼支付,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-07-07
  • SpringBoot中使用@ControllerAdvice注解詳解

    SpringBoot中使用@ControllerAdvice注解詳解

    這篇文章主要介紹了SpringBoot中使用@ControllerAdvice注解詳解,@ControllerAdvice,是Spring3.2提供的新注解,它是一個Controller增強(qiáng)器,可對controller中被 @RequestMapping注解的方法加一些邏輯處理,需要的朋友可以參考下
    2023-10-10
  • JAVA實現(xiàn)的CrazyArcade泡泡堂游戲

    JAVA實現(xiàn)的CrazyArcade泡泡堂游戲

    CrazyArcade泡泡堂游戲,一款用Java編寫的JavaSwing游戲程序。 使用了MVC模式,分離了模型、視圖和控制器,使得項目結(jié)構(gòu)清晰易于擴(kuò)展,使用配置文件來設(shè)置游戲基本配置,擴(kuò)展地圖人物道具等。同時,該程序編寫期間用了單例模式、工廠模式、模板模式等設(shè)計模式。
    2021-04-04
  • 手把手教你如何獲取微信用戶openid

    手把手教你如何獲取微信用戶openid

    眾所周知小程序的openid相當(dāng)重要,它是用戶的唯一標(biāo)識id,牽扯的支付,登錄,授權(quán)等,下面這篇文章主要給大家介紹了關(guān)于如何獲取微信用戶openid的相關(guān)資料,需要的朋友可以參考下
    2023-02-02
  • Java的Volatile實例用法及講解

    Java的Volatile實例用法及講解

    在本篇文章里小編給大家整理了關(guān)于Java的Volatile知識點相關(guān)內(nèi)容,有需要的朋友們可以跟著學(xué)習(xí)下。
    2019-09-09
  • Java前后端的JSON傳輸方式(前后端JSON格式轉(zhuǎn)換)

    Java前后端的JSON傳輸方式(前后端JSON格式轉(zhuǎn)換)

    這篇文章主要介紹了Java前后端的JSON傳輸方式(前后端JSON格式轉(zhuǎn)換),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-04-04
  • springboot連接多個數(shù)據(jù)庫的實現(xiàn)方法

    springboot連接多個數(shù)據(jù)庫的實現(xiàn)方法

    有時候一個SpringBoot項目需要同時連接兩個數(shù)據(jù)庫,本文就來介紹一下springboot連接多個數(shù)據(jù)庫的實現(xiàn)方法,具有一定的參考價值,感興趣的可以了解一下
    2024-08-08
  • SpringMVC+Mybatis二維碼實現(xiàn)多平臺付款(附源碼)

    SpringMVC+Mybatis二維碼實現(xiàn)多平臺付款(附源碼)

    本文主要實現(xiàn)微信支付寶等支付平臺合多為一的二維碼支付,并且實現(xiàn)有效時間內(nèi)支付有效,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-08-08
  • Pulsar源碼徹底解決重復(fù)消費問題

    Pulsar源碼徹底解決重復(fù)消費問題

    這篇文章主要為大家介紹了Pulsar源碼徹底解決重復(fù)消費問題,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-05-05

最新評論