hadoop?全面解讀自定義分區(qū)
更新時間:2024年10月11日 12:01:30 作者:緣不易
Hadoop是一個由Apache基金會所開發(fā)的分布式系統(tǒng)基礎架構。用戶可以在不了解分布式底層細節(jié)的情況下,開發(fā)分布式程序。充分利用集群的威力進行高速運算和存儲
需求
將統(tǒng)計結果按照手機號,以136、137、138、139開頭的數據分別放到一個獨立的文件中,其他開頭的放到一個文件中。(分區(qū))
輸入數據
1863157985066 120.196.100.82 2481 24681 200 1363157995033 120.197.40.4 264 0 200 1373157993055 120.196.100.99 132 1512 200 1393154400022 120.197.40.4 240 0 200 1363157993044 120.196.100.99 1527 2106 200 1397157993055 120.197.40.4 4116 1432 200 1463157993055 120.196.100.99 1116 954 200 1383157995033 120.197.40.4 3156 2936 200 1363157983019 120.196.100.82 240 0 200 1383154400022 120.197.40.4 6960 690 200 1363157973098 120.197.40.4 3659 3538 200 1373157993055 120.196.100.99 1938 180 200 1363154400022 120.196.100.99 918 4938 200 1393157993055 120.197.40.4 180 180 200 1363157984040 120.197.40.4 1938 2910 200
具體實現:
第一步:自定義Mapper:
public class PhoneMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //拿到一行數據 String[] fields = line.split(“\s+”); //切分成各個字段 String phoneNumber = fields[0]; //拿到手機號的字段 //封裝數據為key-value進行輸出 context.write(new Text(phoneNumber), value); } }
第二步:自定義Partitioner
public class PhonePartitioner extends Partitioner { @Override public int getPartition(Text key, Text value, int numPartitions) { String preNum = key.toString().substring(0, 3); // 1 獲取電話號碼的前三位 int partition = 4; switch (preNum) { case “136”: partition = 0; break; case “137”: partition = 1; break; case “138”: partition = 2; break; case “139”: partition = 3; break; default: break; } return partition; } }
第三步:自定義Reducer
public class PhoneReducer extends Reducer { int index = 0; @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { index++; context.write(new LongWritable(index), values.iterator().next()); } }
第四步:自定義Driver
public class PhoneDriver { public static void main(String[] args) throws Exception { args = new String[2]; args[0] = “src/main/resources/phonei”; args[1] = “src/main/resources/phoneo”; // 1 獲取配置信息,或者job對象實例 Configuration cfg = new Configuration(); //設置本地模式運行(即使項目類路徑下core-site.xml文件,依然采用本地模式) cfg.set("mapreduce.framework.name", "local"); cfg.set("fs.defaultFS", "file:///"); Job job = Job.getInstance(cfg); // 2 指定本程序的jar包所在的本地路徑 job.setJarByClass(PhoneDriver.class); // 3 指定本業(yè)務job要使用的mapper/Reducer業(yè)務類 job.setMapperClass(PhoneMapper.class); job.setReducerClass(PhoneReducer.class); // 4 指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 5 指定最終輸出的數據的kv類型 job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); // 8 指定自定義數據分區(qū) job.setPartitionerClass(PhonePartitioner.class); // 9 同時指定相應數量的reduce task(必須指定) job.setNumReduceTasks(5); //----① // 6 指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
到此這篇關于hadoop 全面解讀自定義分區(qū)的文章就介紹到這了,更多相關hadoop 自定義分區(qū)內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
淺談synchronized方法對非synchronized方法的影響
下面小編就為大家?guī)硪黄獪\談synchronized方法對非synchronized方法的影響。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-10-10關于SpringBoot的@ConfigurationProperties注解和松散綁定、數據校驗
這篇文章主要介紹了關于SpringBoot的@ConfigurationProperties注解和松散綁定、數據校驗,@ConfigurationProperties主要作用就是將prefix屬性指定的前綴配置項的值綁定到這個JavaBean上?,通過指定的前綴,來綁定配置文件中的配置,需要的朋友可以參考下2023-05-05Caused by: java.lang.ClassNotFoundException: org.apache.comm
這篇文章主要介紹了Caused by: java.lang.ClassNotFoundException: org.objectweb.asm.Type異常,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-07-07