MapReduce實現(xiàn)TopN效果示例解析
1、背景
最近在學習Hadoop的MapReduce,此處記錄一下如何實現(xiàn) TopN 的效果,以及在MapReduce中如何實現(xiàn) 自定義分組。
2、需求
我們有一份數(shù)據(jù),數(shù)據(jù)中存在如下3個字段,訂單編號,訂單項和訂單項價格。 輸出的數(shù)據(jù),需求如下:
- 訂單編號與訂單編號之間需要正序輸出。
- 輸出每個訂單價格最高的2個訂單項。
3、分析
- 訂單編號與訂單編號之間需要正序輸出,那么訂單編號必須要作為Key,因為只有Key才有排序操作。
- 輸出每個訂單價格最高的2個訂單項: 這個輸出是在reduce階段,并且是每個訂單,因此需要根據(jù)訂單編號進行分組操作(前后2個key比較,相同則為一組),而分組也只有Key才有,因此就需要JavaBean(訂單編號、訂單項、訂單項價格)來作為組合Key。
- 訂單編號與訂單編號之間需要正序輸出 \&& 輸出每個訂單價格最高的2個訂單項: 可以看出在Key中的排序規(guī)則為:根據(jù)訂單編號升序,然后根據(jù)訂單項價格倒序排序, 并且是根據(jù)訂單編號來分組。
- 我們知道默認MapReduce中默認的分區(qū)規(guī)則是,根據(jù)key的hascode來進行分區(qū),而 分區(qū) 下是有多個 分組,每個分組調(diào)用一次reduce方法。 而我們上方的思路是,根據(jù)訂單編號來進行分組,當我們Key是JavaBean組合Key時,相同的訂單編號所在的JavaBean會被分在一個分組嗎,這個不一定,因為JavaBean的hashcode不一定一致,因此就需要我們自定義分區(qū)(繼承Partitioner類)。此處我們job.setNumReduceTasks設置為1個,因此不考慮這個分區(qū)的問題。
- 一個分區(qū)下有多個分組,每個分組調(diào)用一次reduce方法。
4、準備數(shù)據(jù)
4.1 準備數(shù)據(jù)
20230713000010 item-101 10
20230713000010 item-102 30
20230713000015 item-151 10
20230713000015 item-152 20
20230713000010 item-103 20
20230713000015 item-153 30
20230713000012 item-121 50
20230713000012 item-122 10
20230713000012 item-123 30
4.2 每行數(shù)據(jù)格式
訂單編號 訂單項 訂單項價格
20230713000012 item-123 30
每行數(shù)據(jù)的分隔符為空格
4.3 期望輸出結果
20230713000010 item-102 30
20230713000010 item-103 20
20230713000012 item-121 50
20230713000012 item-123 30
20230713000015 item-153 30
20230713000015 item-152 20
5、編碼實現(xiàn)
5.1 引入jar包
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.3.4</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>3.2.2</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass>com.huan.hadoop.mr.TopNDriver</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build>
5.2 編寫實體類
package com.huan.hadoop.mr; import lombok.Getter; import lombok.Setter; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 訂單數(shù)據(jù) * * @author huan.fu * @date 2023/7/13 - 14:20 */ @Getter @Setter public class OrderVo implements WritableComparable<OrderVo> { /** * 訂單編號 */ private long orderId; /** * 訂單項 */ private String itemId; /** * 訂單項價格 */ private long price; @Override public int compareTo(OrderVo o) { // 排序: 根據(jù) 訂單編號 升序, 如果訂單編號相同,則根據(jù) 訂單項價格 倒序 int result = Long.compare(this.orderId, o.orderId); if (result == 0) { // 等于0說明 訂單編號 相同,則需要根據(jù) 訂單項價格 倒序 result = -Long.compare(this.price, o.price); } return result; } @Override public void write(DataOutput out) throws IOException { // 序列化 out.writeLong(orderId); out.writeUTF(itemId); out.writeLong(price); } @Override public void readFields(DataInput in) throws IOException { // 反序列化 this.orderId = in.readLong(); this.itemId = in.readUTF(); this.price = in.readLong(); } @Override public String toString() { return this.getOrderId() + "\t" + this.getItemId() + "\t" + this.getPrice(); } }
- 此處需要實現(xiàn) WritableComparable接口
- 需要編寫 排序和序列化方法
5.3 編寫分組方法
package com.huan.hadoop.mr; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 分組: 訂單編號相同說明是同一組,否則是不同的組 * * @author huan.fu * @date 2023/7/13 - 14:30 */ public class TopNGroupingComparator extends WritableComparator { public TopNGroupingComparator() { // 第二個參數(shù)為true: 表示可以通過反射創(chuàng)建實例 super(OrderVo.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { // 訂單編號 相同說明是同一個對象,否則是不同的對象 return ((OrderVo) a).getOrderId() == ((OrderVo) b).getOrderId() ? 0 : 1; } }
- 實現(xiàn) WritableComparator接口,自定義分組規(guī)則。
- 分組是發(fā)生在reduce階段,前后2個key比較,相同則為一組,一組調(diào)用一次reduce方法。
5.4 編寫 map 方法
package com.huan.hadoop.mr; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * map 操作: 輸出的key為OrderVo, 輸出的value為: price * * @author huan.fu * @date 2023/7/13 - 14:28 */ public class TopNMapper extends Mapper<LongWritable, Text, OrderVo, LongWritable> { private final OrderVo outKey = new OrderVo(); private final LongWritable outValue = new LongWritable(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, OrderVo, LongWritable>.Context context) throws IOException, InterruptedException { // 獲取一行數(shù)據(jù) 20230713000010 item-101 10 String row = value.toString(); // 根據(jù) \t 進行分割 String[] cells = row.split("\\s+"); // 獲取訂單編號 long orderId = Long.parseLong(cells[0]); // 獲取訂單項 String itemId = cells[1]; // 獲取訂單項價格 long price = Long.parseLong(cells[2]); // 設置值 outKey.setOrderId(orderId); outKey.setItemId(itemId); outKey.setPrice(price); outValue.set(price); // 寫出 context.write(outKey, outValue); } }
- map 操作: 輸出的key為OrderVo, 輸出的value為: price
5.5 編寫reduce方法
package com.huan.hadoop.mr; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * reduce操作: Key(OrderVo)相同的分為一組, 此處 OrderVo 作為key, 分組是根據(jù) TopNGroupingComparator 來實現(xiàn), * 即 訂單編號 相同的認為一組 * * @author huan.fu * @date 2023/7/13 - 14:29 */ public class TopNReducer extends Reducer<OrderVo, LongWritable, OrderVo, NullWritable> { @Override protected void reduce(OrderVo key, Iterable<LongWritable> values, Reducer<OrderVo, LongWritable, OrderVo, NullWritable>.Context context) throws IOException, InterruptedException { int topN = 0; // 隨著每次遍歷, key的 orderId 是相同的(因為是根據(jù)這個分組的),但是里面的itemId和price是不同的 for (LongWritable price : values) { topN++; if (topN > 2) { break; } // 注意: 此處的key每次輸出都不一樣 context.write(key, NullWritable.get()); } } }
- reduce操作: Key(OrderVo)相同的分為一組, 此處 OrderVo 作為key, 分組是根據(jù) TopNGroupingComparator 來實現(xiàn),即 訂單編號 相同的認為一組.
- 隨著每次遍歷, key的 orderId 是相同的(因為是根據(jù)這個分組的),但是里面的itemId和price是不同的
5.6 編寫driver類
package com.huan.hadoop.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @author huan.fu * @date 2023/7/13 - 14:29 */ public class TopNDriver extends Configured implements Tool { public static void main(String[] args) throws Exception { // 構建配置對象 Configuration configuration = new Configuration(); // 使用 ToolRunner 提交程序 int status = ToolRunner.run(configuration, new TopNDriver(), args); // 退出程序 System.exit(status); } @Override public int run(String[] args) throws Exception { // 構建Job對象實例 參數(shù)(配置對象,Job對象名稱) Job job = Job.getInstance(getConf(), "topN"); // 設置mr程序運行的主類 job.setJarByClass(TopNDriver.class); // 設置mr程序運行的 mapper類型和reduce類型 job.setMapperClass(TopNMapper.class); job.setReducerClass(TopNReducer.class); // 指定mapper階段輸出的kv數(shù)據(jù)類型 job.setMapOutputKeyClass(OrderVo.class); job.setMapOutputValueClass(LongWritable.class); // 指定reduce階段輸出的kv數(shù)據(jù)類型,業(yè)務mr程序輸出的最終類型 job.setOutputKeyClass(OrderVo.class); job.setOutputValueClass(NullWritable.class); // 配置本例子中的輸入數(shù)據(jù)路徑和輸出數(shù)據(jù)路徑,默認輸入輸出組件為: TextInputFormat和TextOutputFormat FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 先刪除輸出目錄(方便本地測試) FileSystem.get(this.getConf()).delete(new Path(args[1]), true); // 設置分組 job.setGroupingComparatorClass(TopNGroupingComparator.class); return job.waitForCompletion(true) ? 0 : 1; } }
- 需要設置分組 job.setGroupingComparatorClass(TopNGroupingComparator.class);
5.7 運行結果
完整代碼
https://gitee.com/huan1993/spring-cloud-parent/tree/master/hadoop/mr-topn-group
以上就是MapReduce實現(xiàn)TopN的效果的詳細內(nèi)容,更多關于MapReduce TopN效果的資料請關注腳本之家其它相關文章!
相關文章
Springboot實現(xiàn)WebMvcConfigurer接口定制mvc配置詳解
這篇文章主要介紹了Springboot實現(xiàn)WebMvcConfigurer接口定制mvc配置詳解,spring?boot拋棄了傳統(tǒng)xml配置文件,通過配置類(標注@Configuration的類,@Configuration配置類相當于一個xml配置文件)以JavaBean形式進行相關配置,需要的朋友可以參考下2023-09-09Spring MVC 中 短信驗證碼功能的實現(xiàn)方法
短信驗證功能在各個網(wǎng)站應用都非常廣泛,那么在springmvc中如何實現(xiàn)短信驗證碼功能呢?今天小編抽時間給大家介紹下Spring MVC 中 短信驗證碼功能的實現(xiàn)方法,一起看看吧2016-09-09Java?MapStruct優(yōu)雅地實現(xiàn)對象轉換
MapSturct?是一個生成類型安全,高性能且無依賴的?JavaBean?映射代碼的注解處理器,用它可以輕松實現(xiàn)對象轉換,下面就來和大家聊聊具體操作吧2023-06-06