java使用hadoop實(shí)現(xiàn)關(guān)聯(lián)商品統(tǒng)計(jì)
最近幾天一直在看Hadoop相關(guān)的書籍,目前稍微有點(diǎn)感覺,自己就仿照著WordCount程序自己編寫了一個統(tǒng)計(jì)關(guān)聯(lián)商品。
需求描述:
根據(jù)超市的銷售清單,計(jì)算商品之間的關(guān)聯(lián)程度(即統(tǒng)計(jì)同時(shí)買A商品和B商品的次數(shù))。
數(shù)據(jù)格式:
超市銷售清單簡化為如下格式:一行表示一個清單,每個商品采用 "," 分割,如下圖所示:

需求分析:
采用hadoop中的mapreduce對該需求進(jìn)行計(jì)算。
map函數(shù)主要拆分出關(guān)聯(lián)的商品,輸出結(jié)果為 key為商品A,value為商品B,對于第一條三條結(jié)果拆分結(jié)果如下圖所示:

這里為了統(tǒng)計(jì)出和A、B兩件商品想關(guān)聯(lián)的商品,所以商品A、B之間的關(guān)系輸出兩條結(jié)果即 A-B、B-A。
reduce函數(shù)分別對和商品A相關(guān)的商品進(jìn)行分組統(tǒng)計(jì),即分別求value中的各個商品出現(xiàn)的次數(shù),輸出結(jié)果為key為商品A|商品B,value為該組合出現(xiàn)的次數(shù)。針對上面提到的5條記錄,對map輸出中key值為R的做下分析:
通過map函數(shù)的處理,得到如下圖所示的記錄:

reduce中對map輸出的value值進(jìn)行分組計(jì)數(shù),得到的結(jié)果如下圖所示

將商品A B作為key,組合個數(shù)作為value輸出,輸出結(jié)果如下圖所示:

對于需求的實(shí)現(xiàn)過程的分析到目前就結(jié)束了,下面就看下具體的代碼實(shí)現(xiàn)
代碼實(shí)現(xiàn):
關(guān)于代碼就不做詳細(xì)的介紹,具體參照代碼之中的注釋吧。
package com;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Test extends Configured implements Tool{
/**
* map類,實(shí)現(xiàn)數(shù)據(jù)的預(yù)處理
* 輸出結(jié)果key為商品A value為關(guān)聯(lián)商品B
* @author lulei
*/
public static class MapT extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
if (!(line == null || "".equals(line))) {
//分割商品
String []vs = line.split(",");
//兩兩組合,構(gòu)成一條記錄
for (int i = 0; i < (vs.length - 1); i++) {
if ("".equals(vs[i])) {//排除空記錄
continue;
}
for (int j = i+1; j < vs.length; j++) {
if ("".equals(vs[j])) {
continue;
}
//輸出結(jié)果
context.write(new Text(vs[i]), new Text(vs[j]));
context.write(new Text(vs[j]), new Text(vs[i]));
}
}
}
}
}
/**
* reduce類,實(shí)現(xiàn)數(shù)據(jù)的計(jì)數(shù)
* 輸出結(jié)果key 為商品A|B value為該關(guān)聯(lián)次數(shù)
* @author lulei
*/
public static class ReduceT extends Reducer<Text, Text, Text, IntWritable> {
private int count;
/**
* 初始化
*/
public void setup(Context context) {
//從參數(shù)中獲取最小記錄個數(shù)
String countStr = context.getConfiguration().get("count");
try {
this.count = Integer.parseInt(countStr);
} catch (Exception e) {
this.count = 0;
}
}
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
String keyStr = key.toString();
HashMap<String, Integer> hashMap = new HashMap<String, Integer>();
//利用hash統(tǒng)計(jì)B商品的次數(shù)
for (Text value : values) {
String valueStr = value.toString();
if (hashMap.containsKey(valueStr)) {
hashMap.put(valueStr, hashMap.get(valueStr) + 1);
} else {
hashMap.put(valueStr, 1);
}
}
//將結(jié)果輸出
for (Entry<String, Integer> entry : hashMap.entrySet()) {
if (entry.getValue() >= this.count) {//只輸出次數(shù)不小于最小值的
context.write(new Text(keyStr + "|" + entry.getKey()), new IntWritable(entry.getValue()));
}
}
}
}
@Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
Configuration conf = getConf();
conf.set("count", arg0[2]);
Job job = new Job(conf);
job.setJobName("jobtest");
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MapT.class);
job.setReducerClass(ReduceT.class);
FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
if (args.length != 3) {
System.exit(-1);
}
try {
int res = ToolRunner.run(new Configuration(), new Test(), args);
System.exit(res);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
上傳運(yùn)行:
將程序打包成jar文件,上傳到機(jī)群之中。將測試數(shù)據(jù)也上傳到HDFS分布式文件系統(tǒng)中。
命令運(yùn)行截圖如下圖所示:

運(yùn)行結(jié)束后查看相應(yīng)的HDFS文件系統(tǒng),如下圖所示:

到此一個完整的mapreduce程序就完成了,關(guān)于hadoop的學(xué)習(xí),自己還將繼續(xù)~感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!
- java結(jié)合HADOOP集群文件上傳下載
- Java訪問Hadoop分布式文件系統(tǒng)HDFS的配置說明
- Java執(zhí)行hadoop的基本操作實(shí)例代碼
- 深入淺析Java Object Serialization與 Hadoop 序列化
- hadoop中實(shí)現(xiàn)java網(wǎng)絡(luò)爬蟲(示例講解)
- Java/Web調(diào)用Hadoop進(jìn)行MapReduce示例代碼
- Hadoop運(yùn)行時(shí)遇到j(luò)ava.io.FileNotFoundException錯誤的解決方法
- hadoop運(yùn)行java程序(jar包)并運(yùn)行時(shí)動態(tài)指定參數(shù)
- java實(shí)現(xiàn)對Hadoop的操作
- 利用Java連接Hadoop進(jìn)行編程
相關(guān)文章
Java利用Socket和IO流實(shí)現(xiàn)文件的上傳與下載
本文主要介紹了Java利用Socket和IO流實(shí)現(xiàn)文件的上傳與下載,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-04-04
使用自定義注解實(shí)現(xiàn)redisson分布式鎖
這篇文章主要介紹了使用自定義注解實(shí)現(xiàn)redisson分布式鎖,具有很好的參考價(jià)值,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02
基于SpringBoot實(shí)現(xiàn)上傳2種方法工程代碼實(shí)例
這篇文章主要介紹了基于SpringBoot實(shí)現(xiàn)上傳工程代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-08-08
JAXB命名空間_動力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要為大家詳細(xì)介紹了JAXB命名空間的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-08-08
聊聊SpringBoot的@Scheduled的并發(fā)問題
這篇文章主要介紹了聊聊SpringBoot的@Scheduled的并發(fā)問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11
Java中的字節(jié)流InputStream和OutputStream詳解
這篇文章主要介紹了Java中的字節(jié)流InputStream和OutputStream詳解,繼承自InputStream的流都是用于向程序中輸入數(shù)據(jù),且數(shù)據(jù)的單位為字節(jié)8bit,我們看到的具體的某一些管道,凡是以InputStream結(jié)尾的管道,都是以字節(jié)的形式向我們的程序輸入數(shù)據(jù),需要的朋友可以參考下2023-10-10
Spring2.5.6開發(fā)環(huán)境搭建圖文教程
這篇文章主要為大家詳細(xì)介紹了Spring2.5.6開發(fā)環(huán)境搭建圖文教程,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-05-05

