java使用hadoop實(shí)現(xiàn)關(guān)聯(lián)商品統(tǒng)計
最近幾天一直在看Hadoop相關(guān)的書籍,目前稍微有點(diǎn)感覺,自己就仿照著WordCount程序自己編寫了一個統(tǒng)計關(guān)聯(lián)商品。
需求描述:
根據(jù)超市的銷售清單,計算商品之間的關(guān)聯(lián)程度(即統(tǒng)計同時買A商品和B商品的次數(shù))。
數(shù)據(jù)格式:
超市銷售清單簡化為如下格式:一行表示一個清單,每個商品采用 "," 分割,如下圖所示:
需求分析:
采用hadoop中的mapreduce對該需求進(jìn)行計算。
map函數(shù)主要拆分出關(guān)聯(lián)的商品,輸出結(jié)果為 key為商品A,value為商品B,對于第一條三條結(jié)果拆分結(jié)果如下圖所示:
這里為了統(tǒng)計出和A、B兩件商品想關(guān)聯(lián)的商品,所以商品A、B之間的關(guān)系輸出兩條結(jié)果即 A-B、B-A。
reduce函數(shù)分別對和商品A相關(guān)的商品進(jìn)行分組統(tǒng)計,即分別求value中的各個商品出現(xiàn)的次數(shù),輸出結(jié)果為key為商品A|商品B,value為該組合出現(xiàn)的次數(shù)。針對上面提到的5條記錄,對map輸出中key值為R的做下分析:
通過map函數(shù)的處理,得到如下圖所示的記錄:
reduce中對map輸出的value值進(jìn)行分組計數(shù),得到的結(jié)果如下圖所示
將商品A B作為key,組合個數(shù)作為value輸出,輸出結(jié)果如下圖所示:
對于需求的實(shí)現(xiàn)過程的分析到目前就結(jié)束了,下面就看下具體的代碼實(shí)現(xiàn)
代碼實(shí)現(xiàn):
關(guān)于代碼就不做詳細(xì)的介紹,具體參照代碼之中的注釋吧。
package com; import java.io.IOException; import java.util.HashMap; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Test extends Configured implements Tool{ /** * map類,實(shí)現(xiàn)數(shù)據(jù)的預(yù)處理 * 輸出結(jié)果key為商品A value為關(guān)聯(lián)商品B * @author lulei */ public static class MapT extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); if (!(line == null || "".equals(line))) { //分割商品 String []vs = line.split(","); //兩兩組合,構(gòu)成一條記錄 for (int i = 0; i < (vs.length - 1); i++) { if ("".equals(vs[i])) {//排除空記錄 continue; } for (int j = i+1; j < vs.length; j++) { if ("".equals(vs[j])) { continue; } //輸出結(jié)果 context.write(new Text(vs[i]), new Text(vs[j])); context.write(new Text(vs[j]), new Text(vs[i])); } } } } } /** * reduce類,實(shí)現(xiàn)數(shù)據(jù)的計數(shù) * 輸出結(jié)果key 為商品A|B value為該關(guān)聯(lián)次數(shù) * @author lulei */ public static class ReduceT extends Reducer<Text, Text, Text, IntWritable> { private int count; /** * 初始化 */ public void setup(Context context) { //從參數(shù)中獲取最小記錄個數(shù) String countStr = context.getConfiguration().get("count"); try { this.count = Integer.parseInt(countStr); } catch (Exception e) { this.count = 0; } } public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ String keyStr = key.toString(); HashMap<String, Integer> hashMap = new HashMap<String, Integer>(); //利用hash統(tǒng)計B商品的次數(shù) for (Text value : values) { String valueStr = value.toString(); if (hashMap.containsKey(valueStr)) { hashMap.put(valueStr, hashMap.get(valueStr) + 1); } else { hashMap.put(valueStr, 1); } } //將結(jié)果輸出 for (Entry<String, Integer> entry : hashMap.entrySet()) { if (entry.getValue() >= this.count) {//只輸出次數(shù)不小于最小值的 context.write(new Text(keyStr + "|" + entry.getKey()), new IntWritable(entry.getValue())); } } } } @Override public int run(String[] arg0) throws Exception { // TODO Auto-generated method stub Configuration conf = getConf(); conf.set("count", arg0[2]); Job job = new Job(conf); job.setJobName("jobtest"); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(MapT.class); job.setReducerClass(ReduceT.class); FileInputFormat.addInputPath(job, new Path(arg0[0])); FileOutputFormat.setOutputPath(job, new Path(arg0[1])); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub if (args.length != 3) { System.exit(-1); } try { int res = ToolRunner.run(new Configuration(), new Test(), args); System.exit(res); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
上傳運(yùn)行:
將程序打包成jar文件,上傳到機(jī)群之中。將測試數(shù)據(jù)也上傳到HDFS分布式文件系統(tǒng)中。
命令運(yùn)行截圖如下圖所示:
運(yùn)行結(jié)束后查看相應(yīng)的HDFS文件系統(tǒng),如下圖所示:
到此一個完整的mapreduce程序就完成了,關(guān)于hadoop的學(xué)習(xí),自己還將繼續(xù)~感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!
- java結(jié)合HADOOP集群文件上傳下載
- Java訪問Hadoop分布式文件系統(tǒng)HDFS的配置說明
- Java執(zhí)行hadoop的基本操作實(shí)例代碼
- 深入淺析Java Object Serialization與 Hadoop 序列化
- hadoop中實(shí)現(xiàn)java網(wǎng)絡(luò)爬蟲(示例講解)
- Java/Web調(diào)用Hadoop進(jìn)行MapReduce示例代碼
- Hadoop運(yùn)行時遇到j(luò)ava.io.FileNotFoundException錯誤的解決方法
- hadoop運(yùn)行java程序(jar包)并運(yùn)行時動態(tài)指定參數(shù)
- java實(shí)現(xiàn)對Hadoop的操作
- 利用Java連接Hadoop進(jìn)行編程
相關(guān)文章
Java利用Socket和IO流實(shí)現(xiàn)文件的上傳與下載
本文主要介紹了Java利用Socket和IO流實(shí)現(xiàn)文件的上傳與下載,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-04-04使用自定義注解實(shí)現(xiàn)redisson分布式鎖
這篇文章主要介紹了使用自定義注解實(shí)現(xiàn)redisson分布式鎖,具有很好的參考價值,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02基于SpringBoot實(shí)現(xiàn)上傳2種方法工程代碼實(shí)例
這篇文章主要介紹了基于SpringBoot實(shí)現(xiàn)上傳工程代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-08-08JAXB命名空間_動力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要為大家詳細(xì)介紹了JAXB命名空間的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-08-08聊聊SpringBoot的@Scheduled的并發(fā)問題
這篇文章主要介紹了聊聊SpringBoot的@Scheduled的并發(fā)問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11Java中的字節(jié)流InputStream和OutputStream詳解
這篇文章主要介紹了Java中的字節(jié)流InputStream和OutputStream詳解,繼承自InputStream的流都是用于向程序中輸入數(shù)據(jù),且數(shù)據(jù)的單位為字節(jié)8bit,我們看到的具體的某一些管道,凡是以InputStream結(jié)尾的管道,都是以字節(jié)的形式向我們的程序輸入數(shù)據(jù),需要的朋友可以參考下2023-10-10Spring2.5.6開發(fā)環(huán)境搭建圖文教程
這篇文章主要為大家詳細(xì)介紹了Spring2.5.6開發(fā)環(huán)境搭建圖文教程,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-05-05