java 中自定義OutputFormat的實(shí)例詳解
java 中 自定義OutputFormat的實(shí)例詳解
實(shí)例代碼:
package com.ccse.hadoop.outputformat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
public class MySelfOutputFormatApp {
public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput";
public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput";
public final static String OUTPUT_FILENAME = "/abc";
public static void main(String[] args) throws IOException, URISyntaxException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);
fileSystem.delete(new Path(OUTPUT_PATH), true);
Job job = new Job(conf, MySelfOutputFormatApp.class.getSimpleName());
job.setJarByClass(MySelfOutputFormatApp.class);
FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(MyselfOutputFormat.class);
job.waitForCompletion(true);
}
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
private Text word = new Text();
private LongWritable writable = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
if (value != null) {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, writable);
}
}
}
}
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
context.write(key, new LongWritable(sum));
}
}
public static class MyselfOutputFormat extends OutputFormat<Text, LongWritable> {
private FSDataOutputStream outputStream = null;
@Override
public RecordWriter<Text, LongWritable> getRecordWriter(
TaskAttemptContext context) throws IOException,
InterruptedException {
try {
FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration());
//指定文件的輸出路徑
final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH
+ MySelfOutputFormatApp.OUTPUT_FILENAME);
this.outputStream = fileSystem.create(path, false);
} catch (URISyntaxException e) {
e.printStackTrace();
}
return new MySelfRecordWriter(outputStream);
}
@Override
public void checkOutputSpecs(JobContext context) throws IOException,
InterruptedException {
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUTPUT_PATH), context);
}
}
public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable> {
private FSDataOutputStream outputStream = null;
public MySelfRecordWriter(FSDataOutputStream outputStream) {
this.outputStream = outputStream;
}
@Override
public void write(Text key, LongWritable value) throws IOException,
InterruptedException {
this.outputStream.writeBytes(key.toString());
this.outputStream.writeBytes("\t");
this.outputStream.writeLong(value.get());
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
this.outputStream.close();
}
}
}
2.OutputFormat是用于處理各種輸出目的地的。
2.1 OutputFormat需要寫出去的鍵值對,是來自于Reducer類,是通過RecordWriter獲得的。
2.2 RecordWriter中的write(...)方法只有k和v,寫到哪里去哪?這要通過單獨(dú)傳入OutputStream來處理。write就是把k和v寫入到OutputStream中的。
2.3 RecordWriter類位于OutputFormat中的。因此,我們自定義的OutputFromat必須繼承OutputFormat類型。那么,流對象必須在getRecordWriter(...)方法中獲得。
以上就是java 中自定義OutputFormat的實(shí)例,如有疑問請留言或者到本站社區(qū)交流討論,感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!
相關(guān)文章
SpringBoot+vue+Axios實(shí)現(xiàn)Token令牌的詳細(xì)過程
Token是在服務(wù)端產(chǎn)生的,前端可以使用用戶名/密碼向服務(wù)端請求認(rèn)證(登錄),服務(wù)端認(rèn)證成功,服務(wù)端會返回?Token?給前端,Token可以使用自己的算法自定義,本文給大家介紹SpringBoot+vue+Axios實(shí)現(xiàn)Token令牌,感興趣的朋友一起看看吧2023-10-10
自從在 IDEA 中用了熱部署神器 JRebel 之后,開發(fā)效率提升了 10(真棒)
在javaweb開發(fā)過程中,使用熱部署神器 JRebel可以使class類還是更新spring配置文件都能立馬見到效率,本文給大家介紹JRebel的兩種安裝方法,小編建議使用第二種方法,具體安裝步驟跟隨小編一起看看吧2021-06-06
Spring Boot配置接口WebMvcConfigurer的實(shí)現(xiàn)
這篇文章主要介紹了SpringBoot配置接口WebMvcConfigurer的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11
Java編程之如何通過JSP實(shí)現(xiàn)頭像自定義上傳
之前做這個頭像上傳功能還是花了好多時(shí)間的,今天我將我的代碼分享給大家,下面這篇文章主要給大家介紹了關(guān)于Java編程之如何通過JSP實(shí)現(xiàn)頭像自定義上傳的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-12-12
使用Spring boot標(biāo)記一個方法過時(shí)
這篇文章主要介紹了使用Spring boot標(biāo)記一個方法過時(shí),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12
解讀Spring接口方法加@Transactional失效的原因
這篇文章主要介紹了Spring接口方法加@Transactional失效的原因解讀,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-03-03

