教你怎么使用hadoop來提取文件中的指定內(nèi)容
一、需求
把以下txt中含“baidu”字符串的鏈接輸出到一個(gè)文件,否則輸出到另外一個(gè)文件。
二、步驟
1.LogMapper.java
package com.whj.mapreduce.outputformat; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class LogMapper extends Mapper<LongWritable,Text,Text,NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 不做任何處理 context.write(value,NullWritable.get()); } }
2.LogReducer.java
package com.whj.mapreduce.outputformat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class LogReducer extends Reducer<Text,NullWritable,Text,NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { for (NullWritable value : values) { context.write(key,NullWritable.get()); } } }
3.LogOutputFormat.java
package com.whj.mapreduce.outputformat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class LogOutputFormat extends FileOutputFormat<Text,NullWritable> { @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { LogRecordWriter lrw = new LogRecordWriter(job); return lrw; } }
4.LogRecordWriter.java
package com.whj.mapreduce.outputformat; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; public class LogRecordWriter extends RecordWriter<Text,NullWritable> { private FSDataOutputStream baiduOut;//ctrl+alt+f private FSDataOutputStream otherOut; public LogRecordWriter(TaskAttemptContext job) throws IOException { //創(chuàng)建兩條流 FileSystem fs = FileSystem.get(job.getConfiguration()); baiduOut = fs.create(new Path("D:\\temp\\outputformat.log")); otherOut = fs.create(new Path("D:\\temp\\other.log")); } @Override public void write(Text key, NullWritable nullWritable) throws IOException, InterruptedException { // 具體寫 String log = key.toString(); if(log.contains("baidu")){ baiduOut.writeBytes(log+"\n"); }else{ otherOut.writeBytes(log+"\n"); } } @Override public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { //關(guān)流 IOUtils.closeStream(baiduOut); IOUtils.closeStream(otherOut); } }
5.LogDriver.java
package com.whj.mapreduce.outputformat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class LogDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(LogDriver.class); job.setMapperClass(LogMapper.class); job.setReducerClass(LogReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //設(shè)置自定義的 outputformat job.setOutputFormatClass(LogOutputFormat.class); FileInputFormat.setInputPaths(job, new Path("D:\\input")); // 雖 然 我 們 自 定 義 了 outputformat , 但 是 因 為 我 們 的 outputformat 繼承自fileoutputformat //而 fileoutputformat 要輸出一個(gè)_SUCCESS 文件,所以在這還得指定一個(gè)輸出目錄 FileOutputFormat.setOutputPath(job, new Path("D:\\temp\\logoutput")); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
三、結(jié)果
到此這篇關(guān)于教你怎么使用hadoop來提取文件中的指定內(nèi)容的文章就介紹到這了,更多相關(guān)hadoop提取文件內(nèi)容內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
redis redisson 限流器的實(shí)例(RRateLimiter)
這篇文章主要介紹了redis redisson 限流器的實(shí)例(RRateLimiter),具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07Springboot如何實(shí)現(xiàn)自定義異常數(shù)據(jù)
這篇文章主要介紹了Springboot如何實(shí)現(xiàn)自定義異常數(shù)據(jù),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09Java?Dubbo服務(wù)調(diào)用擴(kuò)展點(diǎn)Filter使用教程
Dubbo是阿里巴巴公司開源的一個(gè)高性能優(yōu)秀的服務(wù)框架,使得應(yīng)用可通過高性能的RPC實(shí)現(xiàn)服務(wù)的輸出和輸入功能,可以和Spring框架無縫集成2022-12-12SSH框架網(wǎng)上商城項(xiàng)目第19戰(zhàn)之訂單信息級(jí)聯(lián)入庫以及頁面緩存問題
這篇文章主要介紹了SSH框架網(wǎng)上商城項(xiàng)目第19戰(zhàn)之訂單信息級(jí)聯(lián)入庫以及頁面緩存問題,感興趣的小伙伴們可以參考一下2016-06-06java用兩個(gè)例子充分闡述多態(tài)的可拓展性介紹
下面小編就為大家?guī)硪黄猨ava用兩個(gè)例子充分闡述多態(tài)的可拓展性介紹。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-06-06Spring定時(shí)任務(wù)使用及如何使用郵件監(jiān)控服務(wù)器
這篇文章主要介紹了Spring定時(shí)任務(wù)使用及如何使用郵件監(jiān)控服務(wù)器,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-07-07