java 中自定義OutputFormat的實例詳解
java 中 自定義OutputFormat的實例詳解
實例代碼:
package com.ccse.hadoop.outputformat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; public class MySelfOutputFormatApp { public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput"; public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput"; public final static String OUTPUT_FILENAME = "/abc"; public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf); fileSystem.delete(new Path(OUTPUT_PATH), true); Job job = new Job(conf, MySelfOutputFormatApp.class.getSimpleName()); job.setJarByClass(MySelfOutputFormatApp.class); FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setOutputFormatClass(MyselfOutputFormat.class); job.waitForCompletion(true); } public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { private Text word = new Text(); private LongWritable writable = new LongWritable(1); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { if (value != null) { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, writable); } } } } public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable value : values) { sum += value.get(); } context.write(key, new LongWritable(sum)); } } public static class MyselfOutputFormat extends OutputFormat<Text, LongWritable> { private FSDataOutputStream outputStream = null; @Override public RecordWriter<Text, LongWritable> getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { try { FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration()); //指定文件的輸出路徑 final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH + MySelfOutputFormatApp.OUTPUT_FILENAME); this.outputStream = fileSystem.create(path, false); } catch (URISyntaxException e) { e.printStackTrace(); } return new MySelfRecordWriter(outputStream); } @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { } @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUTPUT_PATH), context); } } public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable> { private FSDataOutputStream outputStream = null; public MySelfRecordWriter(FSDataOutputStream outputStream) { this.outputStream = outputStream; } @Override public void write(Text key, LongWritable value) throws IOException, InterruptedException { this.outputStream.writeBytes(key.toString()); this.outputStream.writeBytes("\t"); this.outputStream.writeLong(value.get()); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { this.outputStream.close(); } } }
2.OutputFormat是用于處理各種輸出目的地的。
2.1 OutputFormat需要寫出去的鍵值對,是來自于Reducer類,是通過RecordWriter獲得的。
2.2 RecordWriter中的write(...)方法只有k和v,寫到哪里去哪?這要通過單獨傳入OutputStream來處理。write就是把k和v寫入到OutputStream中的。
2.3 RecordWriter類位于OutputFormat中的。因此,我們自定義的OutputFromat必須繼承OutputFormat類型。那么,流對象必須在getRecordWriter(...)方法中獲得。
以上就是java 中自定義OutputFormat的實例,如有疑問請留言或者到本站社區(qū)交流討論,感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!
相關(guān)文章
SpringBoot+vue+Axios實現(xiàn)Token令牌的詳細過程
Token是在服務(wù)端產(chǎn)生的,前端可以使用用戶名/密碼向服務(wù)端請求認證(登錄),服務(wù)端認證成功,服務(wù)端會返回?Token?給前端,Token可以使用自己的算法自定義,本文給大家介紹SpringBoot+vue+Axios實現(xiàn)Token令牌,感興趣的朋友一起看看吧2023-10-10自從在 IDEA 中用了熱部署神器 JRebel 之后,開發(fā)效率提升了 10(真棒)
在javaweb開發(fā)過程中,使用熱部署神器 JRebel可以使class類還是更新spring配置文件都能立馬見到效率,本文給大家介紹JRebel的兩種安裝方法,小編建議使用第二種方法,具體安裝步驟跟隨小編一起看看吧2021-06-06Spring Boot配置接口WebMvcConfigurer的實現(xiàn)
這篇文章主要介紹了SpringBoot配置接口WebMvcConfigurer的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11解讀Spring接口方法加@Transactional失效的原因
這篇文章主要介紹了Spring接口方法加@Transactional失效的原因解讀,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-03-03