欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

MapReduce實(shí)現(xiàn)TopN效果示例解析

 更新時(shí)間:2023年07月18日 08:51:46   作者:huan1993  
這篇文章主要為大家介紹了MapReduce實(shí)現(xiàn)TopN效果示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

1、背景

最近在學(xué)習(xí)Hadoop的MapReduce,此處記錄一下如何實(shí)現(xiàn) TopN 的效果,以及在MapReduce中如何實(shí)現(xiàn) 自定義分組。

2、需求

我們有一份數(shù)據(jù),數(shù)據(jù)中存在如下3個(gè)字段,訂單編號(hào),訂單項(xiàng)和訂單項(xiàng)價(jià)格。 輸出的數(shù)據(jù),需求如下:

  • 訂單編號(hào)與訂單編號(hào)之間需要正序輸出。
  • 輸出每個(gè)訂單價(jià)格最高的2個(gè)訂單項(xiàng)。

3、分析

  • 訂單編號(hào)與訂單編號(hào)之間需要正序輸出,那么訂單編號(hào)必須要作為Key,因?yàn)橹挥蠯ey才有排序操作。
  • 輸出每個(gè)訂單價(jià)格最高的2個(gè)訂單項(xiàng): 這個(gè)輸出是在reduce階段,并且是每個(gè)訂單,因此需要根據(jù)訂單編號(hào)進(jìn)行分組操作(前后2個(gè)key比較,相同則為一組),而分組也只有Key才有,因此就需要JavaBean(訂單編號(hào)、訂單項(xiàng)、訂單項(xiàng)價(jià)格)來(lái)作為組合Key。
  • 訂單編號(hào)與訂單編號(hào)之間需要正序輸出 \&& 輸出每個(gè)訂單價(jià)格最高的2個(gè)訂單項(xiàng): 可以看出在Key中的排序規(guī)則為:根據(jù)訂單編號(hào)升序,然后根據(jù)訂單項(xiàng)價(jià)格倒序排序, 并且是根據(jù)訂單編號(hào)來(lái)分組。
  • 我們知道默認(rèn)MapReduce中默認(rèn)的分區(qū)規(guī)則是,根據(jù)key的hascode來(lái)進(jìn)行分區(qū),而 分區(qū) 下是有多個(gè) 分組,每個(gè)分組調(diào)用一次reduce方法。 而我們上方的思路是,根據(jù)訂單編號(hào)來(lái)進(jìn)行分組,當(dāng)我們Key是JavaBean組合Key時(shí),相同的訂單編號(hào)所在的JavaBean會(huì)被分在一個(gè)分組嗎,這個(gè)不一定,因?yàn)镴avaBean的hashcode不一定一致,因此就需要我們自定義分區(qū)(繼承Partitioner類)。此處我們job.setNumReduceTasks設(shè)置為1個(gè),因此不考慮這個(gè)分區(qū)的問(wèn)題。
  • 一個(gè)分區(qū)下有多個(gè)分組,每個(gè)分組調(diào)用一次reduce方法。

4、準(zhǔn)備數(shù)據(jù)

4.1 準(zhǔn)備數(shù)據(jù)

20230713000010  item-101    10
20230713000010  item-102    30
20230713000015  item-151    10
20230713000015  item-152    20
20230713000010  item-103    20
20230713000015  item-153    30
20230713000012  item-121    50
20230713000012  item-122    10
20230713000012  item-123    30

4.2 每行數(shù)據(jù)格式

訂單編號(hào)          訂單項(xiàng)      訂單項(xiàng)價(jià)格
20230713000012  item-123    30

每行數(shù)據(jù)的分隔符為空格

4.3 期望輸出結(jié)果

20230713000010  item-102    30
20230713000010  item-103    20
20230713000012  item-121    50
20230713000012  item-123    30
20230713000015  item-153    30
20230713000015  item-152    20

5、編碼實(shí)現(xiàn)

5.1 引入jar包

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.3.4</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.22</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <version>3.2.2</version>
            <configuration>
                <archive>
                    <manifest>
                        <addClasspath>true</addClasspath>
                        <classpathPrefix>lib/</classpathPrefix>
                        <mainClass>com.huan.hadoop.mr.TopNDriver</mainClass>
                    </manifest>
                </archive>
            </configuration>
        </plugin>
    </plugins>
</build>

5.2 編寫實(shí)體類

package com.huan.hadoop.mr;
import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
 * 訂單數(shù)據(jù)
 *
 * @author huan.fu
 * @date 2023/7/13 - 14:20
 */
@Getter
@Setter
public class OrderVo implements WritableComparable<OrderVo> {
    /**
     * 訂單編號(hào)
     */
    private long orderId;
    /**
     * 訂單項(xiàng)
     */
    private String itemId;
    /**
     * 訂單項(xiàng)價(jià)格
     */
    private long price;
    @Override
    public int compareTo(OrderVo o) {
        // 排序: 根據(jù) 訂單編號(hào) 升序, 如果訂單編號(hào)相同,則根據(jù) 訂單項(xiàng)價(jià)格 倒序
        int result = Long.compare(this.orderId, o.orderId);
        if (result == 0) {
            // 等于0說(shuō)明 訂單編號(hào) 相同,則需要根據(jù) 訂單項(xiàng)價(jià)格 倒序
            result = -Long.compare(this.price, o.price);
        }
        return result;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        // 序列化
        out.writeLong(orderId);
        out.writeUTF(itemId);
        out.writeLong(price);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        // 反序列化
        this.orderId = in.readLong();
        this.itemId = in.readUTF();
        this.price = in.readLong();
    }
    @Override
    public String toString() {
        return this.getOrderId() + "\t" + this.getItemId() + "\t" + this.getPrice();
    }
}
  • 此處需要實(shí)現(xiàn) WritableComparable接口
  • 需要編寫 排序和序列化方法

5.3 編寫分組方法

package com.huan.hadoop.mr;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
 * 分組: 訂單編號(hào)相同說(shuō)明是同一組,否則是不同的組
 *
 * @author huan.fu
 * @date 2023/7/13 - 14:30
 */
public class TopNGroupingComparator extends WritableComparator {
    public TopNGroupingComparator() {
        // 第二個(gè)參數(shù)為true: 表示可以通過(guò)反射創(chuàng)建實(shí)例
        super(OrderVo.class, true);
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // 訂單編號(hào) 相同說(shuō)明是同一個(gè)對(duì)象,否則是不同的對(duì)象
        return ((OrderVo) a).getOrderId() == ((OrderVo) b).getOrderId() ? 0 : 1;
    }
}
  • 實(shí)現(xiàn) WritableComparator接口,自定義分組規(guī)則。
  • 分組是發(fā)生在reduce階段,前后2個(gè)key比較,相同則為一組,一組調(diào)用一次reduce方法。

5.4 編寫 map 方法

package com.huan.hadoop.mr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
 * map 操作: 輸出的key為OrderVo, 輸出的value為: price
 *
 * @author huan.fu
 * @date 2023/7/13 - 14:28
 */
public class TopNMapper extends Mapper<LongWritable, Text, OrderVo, LongWritable> {
    private final OrderVo outKey = new OrderVo();
    private final LongWritable outValue = new LongWritable();
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, OrderVo, LongWritable>.Context context) throws IOException, InterruptedException {
        // 獲取一行數(shù)據(jù) 20230713000010  item-101    10
        String row = value.toString();
        // 根據(jù) \t 進(jìn)行分割
        String[] cells = row.split("\\s+");
        // 獲取訂單編號(hào)
        long orderId = Long.parseLong(cells[0]);
        // 獲取訂單項(xiàng)
        String itemId = cells[1];
        // 獲取訂單項(xiàng)價(jià)格
        long price = Long.parseLong(cells[2]);
        // 設(shè)置值
        outKey.setOrderId(orderId);
        outKey.setItemId(itemId);
        outKey.setPrice(price);
        outValue.set(price);
        // 寫出
        context.write(outKey, outValue);
    }
}
  • map 操作: 輸出的key為OrderVo, 輸出的value為: price

5.5 編寫reduce方法

package com.huan.hadoop.mr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
 * reduce操作: Key(OrderVo)相同的分為一組, 此處 OrderVo 作為key, 分組是根據(jù) TopNGroupingComparator 來(lái)實(shí)現(xiàn),
 * 即 訂單編號(hào) 相同的認(rèn)為一組
 *
 * @author huan.fu
 * @date 2023/7/13 - 14:29
 */
public class TopNReducer extends Reducer<OrderVo, LongWritable, OrderVo, NullWritable> {
    @Override
    protected void reduce(OrderVo key, Iterable<LongWritable> values, Reducer<OrderVo, LongWritable, OrderVo, NullWritable>.Context context) throws IOException, InterruptedException {
        int topN = 0;
        // 隨著每次遍歷, key的 orderId 是相同的(因?yàn)槭歉鶕?jù)這個(gè)分組的),但是里面的itemId和price是不同的
        for (LongWritable price : values) {
            topN++;
            if (topN > 2) {
                break;
            }
            // 注意: 此處的key每次輸出都不一樣
            context.write(key, NullWritable.get());
        }
    }
}
  • reduce操作: Key(OrderVo)相同的分為一組, 此處 OrderVo 作為key, 分組是根據(jù) TopNGroupingComparator 來(lái)實(shí)現(xiàn),即 訂單編號(hào) 相同的認(rèn)為一組.
  • 隨著每次遍歷, key的 orderId 是相同的(因?yàn)槭歉鶕?jù)這個(gè)分組的),但是里面的itemId和price是不同的

5.6 編寫driver類

package com.huan.hadoop.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 * @author huan.fu
 * @date 2023/7/13 - 14:29
 */
public class TopNDriver extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        // 構(gòu)建配置對(duì)象
        Configuration configuration = new Configuration();
        // 使用 ToolRunner 提交程序
        int status = ToolRunner.run(configuration, new TopNDriver(), args);
        // 退出程序
        System.exit(status);
    }
    @Override
    public int run(String[] args) throws Exception {
        // 構(gòu)建Job對(duì)象實(shí)例 參數(shù)(配置對(duì)象,Job對(duì)象名稱)
        Job job = Job.getInstance(getConf(), "topN");
        // 設(shè)置mr程序運(yùn)行的主類
        job.setJarByClass(TopNDriver.class);
        // 設(shè)置mr程序運(yùn)行的 mapper類型和reduce類型
        job.setMapperClass(TopNMapper.class);
        job.setReducerClass(TopNReducer.class);
        // 指定mapper階段輸出的kv數(shù)據(jù)類型
        job.setMapOutputKeyClass(OrderVo.class);
        job.setMapOutputValueClass(LongWritable.class);
        // 指定reduce階段輸出的kv數(shù)據(jù)類型,業(yè)務(wù)mr程序輸出的最終類型
        job.setOutputKeyClass(OrderVo.class);
        job.setOutputValueClass(NullWritable.class);
        // 配置本例子中的輸入數(shù)據(jù)路徑和輸出數(shù)據(jù)路徑,默認(rèn)輸入輸出組件為: TextInputFormat和TextOutputFormat
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 先刪除輸出目錄(方便本地測(cè)試)
        FileSystem.get(this.getConf()).delete(new Path(args[1]), true);
        // 設(shè)置分組
        job.setGroupingComparatorClass(TopNGroupingComparator.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }
}
  • 需要設(shè)置分組 job.setGroupingComparatorClass(TopNGroupingComparator.class);

5.7 運(yùn)行結(jié)果

完整代碼

https://gitee.com/huan1993/spring-cloud-parent/tree/master/hadoop/mr-topn-group

以上就是MapReduce實(shí)現(xiàn)TopN的效果的詳細(xì)內(nèi)容,更多關(guān)于MapReduce TopN效果的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • java安全編碼指南之:Number操作詳解

    java安全編碼指南之:Number操作詳解

    這篇文章主要介紹了java安全編碼指南之:Number操作詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-09-09
  • Java語(yǔ)言之包和繼承詳解

    Java語(yǔ)言之包和繼承詳解

    這篇文章主要介紹了java的包和繼承,結(jié)合實(shí)例形式詳細(xì)分析了Java繼承的概念、原理、用法及相關(guān)操作注意事項(xiàng),需要的朋友可以參考下
    2021-09-09
  • Springboot實(shí)現(xiàn)WebMvcConfigurer接口定制mvc配置詳解

    Springboot實(shí)現(xiàn)WebMvcConfigurer接口定制mvc配置詳解

    這篇文章主要介紹了Springboot實(shí)現(xiàn)WebMvcConfigurer接口定制mvc配置詳解,spring?boot拋棄了傳統(tǒng)xml配置文件,通過(guò)配置類(標(biāo)注@Configuration的類,@Configuration配置類相當(dāng)于一個(gè)xml配置文件)以JavaBean形式進(jìn)行相關(guān)配置,需要的朋友可以參考下
    2023-09-09
  • GC參考手冊(cè)二java中垃圾回收原理解析

    GC參考手冊(cè)二java中垃圾回收原理解析

    由于有個(gè)垃圾回收機(jī)制,java中的額對(duì)象不在有“作用域”的概念,只有對(duì)象的引用才有“作用域”。垃圾回收可以有效的防止內(nèi)存泄露,有效的使用空閑的內(nèi)存<BR>
    2022-01-01
  • 使用maven?shade插件解決項(xiàng)目版本沖突詳解

    使用maven?shade插件解決項(xiàng)目版本沖突詳解

    這篇文章主要為大家介紹了使用maven?shade插件解決項(xiàng)目版本沖突詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-09-09
  • Spring MVC 中 短信驗(yàn)證碼功能的實(shí)現(xiàn)方法

    Spring MVC 中 短信驗(yàn)證碼功能的實(shí)現(xiàn)方法

    短信驗(yàn)證功能在各個(gè)網(wǎng)站應(yīng)用都非常廣泛,那么在springmvc中如何實(shí)現(xiàn)短信驗(yàn)證碼功能呢?今天小編抽時(shí)間給大家介紹下Spring MVC 中 短信驗(yàn)證碼功能的實(shí)現(xiàn)方法,一起看看吧
    2016-09-09
  • springboot如何添加全局異常捕獲類

    springboot如何添加全局異常捕獲類

    這篇文章主要介紹了springboot如何添加全局異常捕獲類,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-01-01
  • Java?MapStruct優(yōu)雅地實(shí)現(xiàn)對(duì)象轉(zhuǎn)換

    Java?MapStruct優(yōu)雅地實(shí)現(xiàn)對(duì)象轉(zhuǎn)換

    MapSturct?是一個(gè)生成類型安全,高性能且無(wú)依賴的?JavaBean?映射代碼的注解處理器,用它可以輕松實(shí)現(xiàn)對(duì)象轉(zhuǎn)換,下面就來(lái)和大家聊聊具體操作吧
    2023-06-06
  • 基于JDK8總結(jié)java中的interrupt

    基于JDK8總結(jié)java中的interrupt

    本文是基于JDK8總結(jié)java中的interrupt知識(shí),需要的朋友可以參考下
    2017-12-12
  • SpringBoot項(xiàng)目運(yùn)行一段時(shí)間后自動(dòng)關(guān)閉的坑及解決

    SpringBoot項(xiàng)目運(yùn)行一段時(shí)間后自動(dòng)關(guān)閉的坑及解決

    這篇文章主要介紹了SpringBoot項(xiàng)目運(yùn)行一段時(shí)間后自動(dòng)關(guān)閉的坑及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-09-09

最新評(píng)論