詳解HDFS多文件Join操作的實例
詳解HDFS多文件Join操作的實例
最近在做HDFS文件處理之時,遇到了多文件Join操作,其中包括:All Join以及常用的Left Join操作,
下面是個簡單的例子;采用兩個表來做left join其中數(shù)據(jù)結(jié)構(gòu)如下:
A 文件:
a|1b|2|c
B文件:
a|b|1|2|c
即:A文件中的第一、二列與B文件中的第一、三列對應(yīng);類似數(shù)據(jù)庫中Table的主鍵/外鍵
代碼如下:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import cn.eshore.traffic.hadoop.util.CommUtil;
import cn.eshore.traffic.hadoop.util.StringUtil;
/**
* @ClassName: DataJoin
* @Description: HDFS JOIN操作
* @author hadoop
* @date 2012-12-18 下午5:51:32
*/
public class InstallJoin extends Configured implements Tool {
private String static enSplitCode = "\\|";
private String static splitCode = "|";
// 自定義Reducer
public static class ReduceClass extends DataJoinReducerBase {
@Override
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
String joinedStr = "";
//該段判斷用戶生成Left join限制【其中tags表示文件的路徑,install表示文件名稱前綴】
//去掉則為All Join
if (tags.length == 1 && tags[0].toString().contains("install")) {
return null;
}
Map<String, String> map = new HashMap<String, String>();
for (int i = 0; i < values.length; i++) {
TaggedWritable tw = (TaggedWritable) values[i];
String line = ((Text) tw.getData()).toString();
String[] tokens = line.split(enSplitCode, 8);
String groupValue = tokens[6];
String type = tokens[7];
map.put(type, groupValue);
}
joinedStr += StringUtil.getCount(map.get("7"))+"|"+StringUtil.getCount(map.get("30"));
TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
retv.setTag((Text) tags[0]);
return retv;
}
}
// 自定義Mapper
public static class MapClass extends DataJoinMapperBase {
//自定義Key【類似數(shù)據(jù)庫中的主鍵/外鍵】
@Override
protected Text generateGroupKey(TaggedMapOutput aRecord) {
String line = ((Text) aRecord.getData()).toString();
String[] tokens = line.split(CommUtil.enSplitCode);
String key = "";
String type = tokens[7];
//由于不同文件中的Key所在列有可能不同,所以需要動態(tài)生成Key,其中type為不同文件中的數(shù)據(jù)標識;如:A文件最后一列為a用于表示此數(shù)據(jù)為A文件數(shù)據(jù)
if ("7".equals(type)) {
key = tokens[0]+"|"+tokens[1];
}else if ("30".equals(type)) {
key = tokens[0]+"|"+tokens[2];
}
return new Text(key);
}
@Override
protected Text generateInputTag(String inputFile) {
return new Text(inputFile);
}
@Override
protected TaggedMapOutput generateTaggedMapOutput(Object value) {
TaggedWritable retv = new TaggedWritable((Text) value);
retv.setTag(this.inputTag);
return retv;
}
}
public static class TaggedWritable extends TaggedMapOutput {
private Writable data;
// 自定義
public TaggedWritable() {
this.tag = new Text("");
}
public TaggedWritable(Writable data) {
this.tag = new Text("");
this.data = data;
}
@Override
public Writable getData() {
return data;
}
@Override
public void write(DataOutput out) throws IOException {
this.tag.write(out);
out.writeUTF(this.data.getClass().getName());
this.data.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
this.tag.readFields(in);
String dataClz = in.readUTF();
if (this.data == null
|| !this.data.getClass().getName().equals(dataClz)) {
try {
this.data = (Writable) ReflectionUtils.newInstance(
Class.forName(dataClz), null);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
this.data.readFields(in);
}
}
/**
* job運行
*/
@Override
public int run(String[] paths) throws Exception {
int no = 0;
try {
Configuration conf = getConf();
JobConf job = new JobConf(conf, InstallJoin.class);
FileInputFormat.setInputPaths(job, new Path(paths[0]));
FileOutputFormat.setOutputPath(job, new Path(paths[1]));
job.setJobName("join_data_test");
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set("mapred.textoutputformat.separator", CommUtil.splitCode);
JobClient.runJob(job);
no = 1;
} catch (Exception e) {
throw new Exception();
}
return no;
}
//測試
public static void main(String[] args) {
String[] paths = {
"hdfs://master...:9000/home/hadoop/traffic/join/newtype",
"hdfs://master...:9000/home/hadoop/traffic/join/newtype/output" }
int res = 0;
try {
res = ToolRunner.run(new Configuration(), new InstallJoin(), paths);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(res);
}
}
如有疑問請留言或者到本站社區(qū)交流討論,感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!
相關(guān)文章
圖文講解IDEA中根據(jù)數(shù)據(jù)庫自動生成實體類
這篇文章主要以圖文講解IDEA中根據(jù)數(shù)據(jù)庫自動生成實體類,本文主要以Mysql數(shù)據(jù)庫為例,應(yīng)該會對大家有所幫助,如果有錯誤的地方,還望指正2023-03-03
關(guān)于Spring事務(wù)隔離、傳播屬性與@Transactional注解
這篇文章主要介紹了關(guān)于事務(wù)隔離、Spring傳播屬性與@Transactional注解,如果一組處理步驟或者全部發(fā)生或者一步也不執(zhí)行,我們稱該組處理步驟為一個事務(wù),需要的朋友可以參考下2023-05-05
Mybatis以main方法形式調(diào)用dao層執(zhí)行代碼實例
這篇文章主要介紹了Mybatis以main方法形式調(diào)用dao層執(zhí)行代碼實例,MyBatis 是一款優(yōu)秀的持久層框架,MyBatis 免除了幾乎所有的 JDBC 代碼以及設(shè)置參數(shù)和獲取結(jié)果集的工作,需要的朋友可以參考下2023-08-08
springboot學習筆記之 profile多環(huán)境配置切換的實現(xiàn)方式
這篇文章主要介紹了springboot profile多環(huán)境配置切換的實現(xiàn)方式,本文給大家介紹的非常詳細,具有一定的參考借鑒價值 ,需要的朋友可以參考下2019-07-07

