詳解HDFS多文件Join操作的實(shí)例
詳解HDFS多文件Join操作的實(shí)例
最近在做HDFS文件處理之時(shí),遇到了多文件Join操作,其中包括:All Join以及常用的Left Join操作,
下面是個(gè)簡單的例子;采用兩個(gè)表來做left join其中數(shù)據(jù)結(jié)構(gòu)如下:
A 文件:
a|1b|2|c
B文件:
a|b|1|2|c
即:A文件中的第一、二列與B文件中的第一、三列對(duì)應(yīng);類似數(shù)據(jù)庫中Table的主鍵/外鍵
代碼如下:
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import cn.eshore.traffic.hadoop.util.CommUtil; import cn.eshore.traffic.hadoop.util.StringUtil; /** * @ClassName: DataJoin * @Description: HDFS JOIN操作 * @author hadoop * @date 2012-12-18 下午5:51:32 */ public class InstallJoin extends Configured implements Tool { private String static enSplitCode = "\\|"; private String static splitCode = "|"; // 自定義Reducer public static class ReduceClass extends DataJoinReducerBase { @Override protected TaggedMapOutput combine(Object[] tags, Object[] values) { String joinedStr = ""; //該段判斷用戶生成Left join限制【其中tags表示文件的路徑,install表示文件名稱前綴】 //去掉則為All Join if (tags.length == 1 && tags[0].toString().contains("install")) { return null; } Map<String, String> map = new HashMap<String, String>(); for (int i = 0; i < values.length; i++) { TaggedWritable tw = (TaggedWritable) values[i]; String line = ((Text) tw.getData()).toString(); String[] tokens = line.split(enSplitCode, 8); String groupValue = tokens[6]; String type = tokens[7]; map.put(type, groupValue); } joinedStr += StringUtil.getCount(map.get("7"))+"|"+StringUtil.getCount(map.get("30")); TaggedWritable retv = new TaggedWritable(new Text(joinedStr)); retv.setTag((Text) tags[0]); return retv; } } // 自定義Mapper public static class MapClass extends DataJoinMapperBase { //自定義Key【類似數(shù)據(jù)庫中的主鍵/外鍵】 @Override protected Text generateGroupKey(TaggedMapOutput aRecord) { String line = ((Text) aRecord.getData()).toString(); String[] tokens = line.split(CommUtil.enSplitCode); String key = ""; String type = tokens[7]; //由于不同文件中的Key所在列有可能不同,所以需要?jiǎng)討B(tài)生成Key,其中type為不同文件中的數(shù)據(jù)標(biāo)識(shí);如:A文件最后一列為a用于表示此數(shù)據(jù)為A文件數(shù)據(jù) if ("7".equals(type)) { key = tokens[0]+"|"+tokens[1]; }else if ("30".equals(type)) { key = tokens[0]+"|"+tokens[2]; } return new Text(key); } @Override protected Text generateInputTag(String inputFile) { return new Text(inputFile); } @Override protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable retv = new TaggedWritable((Text) value); retv.setTag(this.inputTag); return retv; } } public static class TaggedWritable extends TaggedMapOutput { private Writable data; // 自定義 public TaggedWritable() { this.tag = new Text(""); } public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } @Override public Writable getData() { return data; } @Override public void write(DataOutput out) throws IOException { this.tag.write(out); out.writeUTF(this.data.getClass().getName()); this.data.write(out); } @Override public void readFields(DataInput in) throws IOException { this.tag.readFields(in); String dataClz = in.readUTF(); if (this.data == null || !this.data.getClass().getName().equals(dataClz)) { try { this.data = (Writable) ReflectionUtils.newInstance( Class.forName(dataClz), null); } catch (ClassNotFoundException e) { e.printStackTrace(); } } this.data.readFields(in); } } /** * job運(yùn)行 */ @Override public int run(String[] paths) throws Exception { int no = 0; try { Configuration conf = getConf(); JobConf job = new JobConf(conf, InstallJoin.class); FileInputFormat.setInputPaths(job, new Path(paths[0])); FileOutputFormat.setOutputPath(job, new Path(paths[1])); job.setJobName("join_data_test"); job.setMapperClass(MapClass.class); job.setReducerClass(ReduceClass.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); job.set("mapred.textoutputformat.separator", CommUtil.splitCode); JobClient.runJob(job); no = 1; } catch (Exception e) { throw new Exception(); } return no; } //測試 public static void main(String[] args) { String[] paths = { "hdfs://master...:9000/home/hadoop/traffic/join/newtype", "hdfs://master...:9000/home/hadoop/traffic/join/newtype/output" } int res = 0; try { res = ToolRunner.run(new Configuration(), new InstallJoin(), paths); } catch (Exception e) { e.printStackTrace(); } System.exit(res); } }
如有疑問請(qǐng)留言或者到本站社區(qū)交流討論,感謝閱讀,希望能幫助到大家,謝謝大家對(duì)本站的支持!
相關(guān)文章
圖文講解IDEA中根據(jù)數(shù)據(jù)庫自動(dòng)生成實(shí)體類
這篇文章主要以圖文講解IDEA中根據(jù)數(shù)據(jù)庫自動(dòng)生成實(shí)體類,本文主要以Mysql數(shù)據(jù)庫為例,應(yīng)該會(huì)對(duì)大家有所幫助,如果有錯(cuò)誤的地方,還望指正2023-03-03Tomcat多war包部署實(shí)戰(zhàn)示例及注意事項(xiàng)
多服務(wù)部署在一個(gè)tomcat中,服務(wù)之間互相調(diào)用,下面這篇文章主要給大家介紹了關(guān)于Tomcat多war包部署實(shí)戰(zhàn)示例及注意事項(xiàng)的相關(guān)資料,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-12-12關(guān)于Spring事務(wù)隔離、傳播屬性與@Transactional注解
這篇文章主要介紹了關(guān)于事務(wù)隔離、Spring傳播屬性與@Transactional注解,如果一組處理步驟或者全部發(fā)生或者一步也不執(zhí)行,我們稱該組處理步驟為一個(gè)事務(wù),需要的朋友可以參考下2023-05-05Mybatis以main方法形式調(diào)用dao層執(zhí)行代碼實(shí)例
這篇文章主要介紹了Mybatis以main方法形式調(diào)用dao層執(zhí)行代碼實(shí)例,MyBatis 是一款優(yōu)秀的持久層框架,MyBatis 免除了幾乎所有的 JDBC 代碼以及設(shè)置參數(shù)和獲取結(jié)果集的工作,需要的朋友可以參考下2023-08-08springboot學(xué)習(xí)筆記之 profile多環(huán)境配置切換的實(shí)現(xiàn)方式
這篇文章主要介紹了springboot profile多環(huán)境配置切換的實(shí)現(xiàn)方式,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值 ,需要的朋友可以參考下2019-07-07servlet上傳文件實(shí)現(xiàn)代碼詳解(四)
這篇文章主要為大家詳細(xì)介紹了servlet上傳文件的實(shí)現(xiàn)代碼,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-09-09