欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

hadoop?切片機(jī)制分析與應(yīng)用

 更新時(shí)間:2024年10月11日 11:55:48   作者:蒼鷹蛟龍  
切片這個(gè)詞對于做過python開發(fā)的同學(xué)一定不陌生,但是與hadoop中的切片有所區(qū)別,hadoop中的切片是為了優(yōu)化hadoop的job在處理過程中MapTask階段的性能達(dá)到最優(yōu)而言

一個(gè)超大文件在HDFS上存儲時(shí),是以多個(gè)Block存儲在不同的節(jié)點(diǎn)上,比如一個(gè)512M的文件,HDFS默認(rèn)一個(gè)Block為128M,那么1G的文件分成4個(gè)Block存儲在集群中4個(gè)節(jié)點(diǎn)上。

Hadoop在map階段處理上述512M的大文件時(shí)分成幾個(gè)MapTask進(jìn)行處理呢?Hadoop的MapTask并行度與數(shù)據(jù)切片有有關(guān)系,數(shù)據(jù)切片是對輸入的文件在邏輯上進(jìn)行分片,對文件切成多少份,Hadoop就會分配多少個(gè)MapTask任務(wù)進(jìn)行并行執(zhí)行該文件,原理如下圖所示。
Block與Splite區(qū)別:Block是HDFS物理上把數(shù)據(jù)分成一塊一塊;數(shù)據(jù)切片只是在邏輯上對輸入進(jìn)行分片,并不會在磁盤上將其切分成片進(jìn)行存儲。如下圖所示,一個(gè)512M的文件在HDFS上存儲時(shí),默認(rèn)一個(gè)block為128M,那么該文件需要4個(gè)block進(jìn)行物理存儲;若對該文件進(jìn)行切片,假設(shè)以100M大小進(jìn)行切片,該文件在邏輯上需要切成5片,則需要5個(gè)MapTask任務(wù)進(jìn)行處理。

在這里插入圖片描述

一、數(shù)據(jù)切片源碼詳解

  /** 
	   * Generate the list of files and make them into FileSplits.
	   * @param job the job context
	   * @throws IOException
	   */
	  public List<InputSplit> getSplits(JobContext job) throws IOException {
	    StopWatch sw = new StopWatch().start();
	    /*
	     * 	1、minSize默認(rèn)最小值為1
	     *     maxSize默認(rèn)最大值為9,223,372,036,854,775,807?
	     * */
	    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
	    long maxSize = getMaxSplitSize(job);

	    // generate splits
	    List<InputSplit> splits = new ArrayList<InputSplit>();
	    /*
	     *   2、獲取所有需要處理的文件
	     * */
	    List<FileStatus> files = listStatus(job);
	    for (FileStatus file: files) {
	      Path path = file.getPath();
	      /*
	       *   3、獲取文件的大小
	       * */
	      long length = file.getLen();
	      if (length != 0) {
	        BlockLocation[] blkLocations;
	        if (file instanceof LocatedFileStatus) {
	          /*
	           * 4、獲取文件的block,比如一個(gè)500M的文件,默認(rèn)一個(gè)Block為128M,500M的文件會分布在4個(gè)DataNode節(jié)點(diǎn)上進(jìn)行存儲
	           * */
	          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
	        } else {
	        	/*
	        	 * 5、Hadoop如不特殊指定,默認(rèn)用的HDFS文件系統(tǒng),只會走上面if分支
	        	 * */
	          FileSystem fs = path.getFileSystem(job.getConfiguration());
	          blkLocations = fs.getFileBlockLocations(file, 0, length);
	        }
	        if (isSplitable(job, path)) {
	          /*
	           * 6、獲取Block塊的大小,默認(rèn)為128M
	           * */
	          long blockSize = file.getBlockSize();
	          /*
	           * 7、計(jì)算spliteSize分片的尺寸,首先取blockSize與maxSize之間的最小值即blockSize,
	                          *         然后取blockSize與minSize之間的最大值,即為blockSize=128M,所以分片尺寸默認(rèn)為128M
	           * */
	          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

	          long bytesRemaining = length;
	          /*
	           * 8、計(jì)算分片file文件可以在邏輯上劃分為多少個(gè)數(shù)據(jù)切片,并把切片信息加入到List集合中
	           * */
	          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
	            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
	            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
	                        blkLocations[blkIndex].getHosts(),
	                        blkLocations[blkIndex].getCachedHosts()));
	            bytesRemaining -= splitSize;
	          }

	          /*
	           * 9、如果文件最后一個(gè)切片不滿128M,單獨(dú)切分到一個(gè)數(shù)據(jù)切片中
	           * */
	          if (bytesRemaining != 0) {
	            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
	            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
	                       blkLocations[blkIndex].getHosts(),
	                       blkLocations[blkIndex].getCachedHosts()));
	          }
	        } else { // not splitable
	          /*
	           * 10、如果文件不可以切分,比如壓縮文件,會創(chuàng)建一個(gè)數(shù)據(jù)切片
	           * */
	          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
	                      blkLocations[0].getCachedHosts()));
	        }
	      } else { 
	        //Create empty hosts array for zero length file
	    	/*
	    	 * 11、如果為空文件,創(chuàng)建一個(gè)空的數(shù)據(jù)切片
	    	 * */
	        splits.add(makeSplit(path, 0, length, new String[0]));
	      }
	    }
	    // Save the number of input files for metrics/loadgen
	    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
	    sw.stop();
	    if (LOG.isDebugEnabled()) {
	      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
	          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
	    }
	    return splits;
	  }

二、數(shù)據(jù)切片機(jī)制

1、TextInputFormat切片機(jī)制

切片方式:TextInputFormat是默認(rèn)的切片機(jī)制,按文件規(guī)劃進(jìn)行切分。比如切片默認(rèn)為128M,如果一個(gè)文件為200M,則會形成兩個(gè)切片,一個(gè)是128M,一個(gè)是72M,啟動兩個(gè)MapTask任務(wù)進(jìn)行處理任務(wù)。但是如果一個(gè)文件只有1M,也會單獨(dú)啟動一個(gè)MapTask執(zhí)行此任務(wù),如果是10個(gè)這樣的小文件,就會啟動10個(gè)MapTask處理小文件任務(wù)。
讀取方式:TextInputFormat是按行讀取文件的每條記錄,key代表讀取的文件行在該文件中的起始字節(jié)偏移量,key為LongWritable類型;value為讀取的行內(nèi)容,不包括任何行終止符(換行符/回車符), value為Text類型,相當(dāng)于java中的String類型。
例如

Birds of a feather flock together
Barking dogs seldom bite
Bad news has wings

用TextInputFormat按每行讀取文件時(shí),對應(yīng)的key和value分別為:

(0,Birds of a feather flock together)
(34,Barking dogs seldom bite)
(59,Bad news has wings)

Demo:下面測試案例已統(tǒng)計(jì)單詞為測試案例,處理文件為D:\tmp\word\in 目錄下的4個(gè)文件。

在這里插入圖片描述

建立對應(yīng)的Mapper類WordCountMapper

package com.lzj.hadoop.input;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*
 * LongWritable - 表示讀取第幾行
 * Text 		-  表示讀取一行的內(nèi)容
 * Text			- 表示輸出的鍵
 * IntWritable 	- 表示輸出的鍵對應(yīng)的個(gè)數(shù)
 * */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	
	Text k = new Text();
	IntWritable v = new IntWritable(1);
	
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		//1、讀取一行內(nèi)容
		String line = value.toString();
		if(line.isEmpty()) {
			return;
		}
		//2、按空格切割讀取的單詞
		String[] words = line.split(" ");
		//3、輸出mapper處理完的內(nèi)容
		for(String word : words) {
			/*給鍵設(shè)置值*/
			k.set(word); 
			/*把mapper處理后的鍵值對寫到context中*/
			context.write(k, v);
		}
		
	}
	
}

建立對應(yīng)的Reducer類:

package com.lzj.hadoop.input;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/*
 * Text 		-  輸入的鍵(即Mapper階段輸出的鍵)
 * IntWritable 	- 輸入的值(個(gè)數(shù))(即Mapper階段輸出的值)
 * Text 		- 輸出的鍵
 * IntWritable 	- 輸出的值
 * */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
	@Override
	protected void reduce(Text text, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		//1、統(tǒng)計(jì)鍵對應(yīng)的個(gè)數(shù)
		int sum = 0;
		for(IntWritable value : values) {
			sum = sum + value.get();
		}
		//2、設(shè)置reducer的輸出
		IntWritable v = new IntWritable(sum);
		context.write(text, v);
	}
}

建立驅(qū)動類drive

/*測試TextInputFormat*/
public void testTextInputFormat() throws IOException, ClassNotFoundException, InterruptedException{
	//1、獲取job的配置信息
	Configuration conf = new Configuration();
	Job job = Job.getInstance(conf);
	//2、設(shè)置jar的加載路徑
	job.setJarByClass(WordCountDriver.class);
	//3、分別設(shè)置Mapper和Reducer類
	job.setMapperClass(WordCountMapper.class);
	job.setReducerClass(WordCountReducer.class);
	//4、設(shè)置map的輸出類型
	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(IntWritable.class);
	//5、設(shè)置最終輸出的鍵值類型
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(IntWritable.class);
	//6、設(shè)置輸入輸出路徑
	FileInputFormat.setInputPaths(job, new Path("D:/tmp/word/in"));
	FileOutputFormat.setOutputPath(job, new Path("D:/tmp/word/out"));
	//7、提交任務(wù)
	boolean flag = job.waitForCompletion(true);
	System.out.println("flag ; " + flag);
}

啟動測試,在輸出的日志信息中會有如下一行內(nèi)容:

number of splits:4

處理的4個(gè)文件1.txt、2.txt、3.txt、4.txt分別小于128M,每一個(gè)文件會被切成一個(gè)split。

2、CombineTextInputFormat切片機(jī)制

如果要處理的任務(wù)中含有很多小文件,采用默認(rèn)的TextInputFormat切片機(jī)制會啟動多個(gè)MapTask任務(wù)處理文件,浪費(fèi)資源。CombineTextInputFormat用于處理小文件過多的場景,它可以將多個(gè)小文件從邏輯上切分到一個(gè)切片中。CombineTextInputFormat在形成切片過程中分為虛擬存儲過程和切片過程兩個(gè)過程。

(1)虛擬存儲過程將輸入目錄下所有文件大小,依次和設(shè)置的setMaxInputSplitSize值比較,如果不大于設(shè)置的最大值,邏輯上劃分一個(gè)塊。如果輸入文件大于設(shè)置的最大值且大于兩倍,那么以最大值切割一塊;當(dāng)剩余數(shù)據(jù)大小超過設(shè)置的最大值且不大于最大值2倍,此時(shí)將文件均分成2個(gè)虛擬存儲塊(防止出現(xiàn)太小切片)。
例如setMaxInputSplitSize值為4M,輸入文件大小為8.02M,則先邏輯上分成一個(gè)4M。剩余的大小為4.02M,如果按照4M邏輯劃分,就會出現(xiàn)0.02M的小的虛擬存儲文件,所以將剩余的4.02M文件切分成(2.01M和2.01M)兩個(gè)文件。
(2)切片過程判斷虛擬存儲的文件大小是否大于setMaxInputSplitSize值,大于等于則單獨(dú)形成一個(gè)切片;
如果不大于則跟下一個(gè)虛擬存儲文件進(jìn)行合并,共同形成一個(gè)切片。

下面以“D:\\tmp\\word\\in”目錄下的1.txt(576K)、2.txt(1151K)、3.txt(2302K)、4.txt(4604K)為例,比如設(shè)置虛擬存儲切片setMaxInputSplitSize為2M,1.txt 大小576K小于2M,形成一個(gè)存儲塊,2.txt大小1151K也小于2M,形成一個(gè)存儲塊,3.txt大小2302K大于2M,但小于4M,形成兩個(gè)存儲塊,分別為1151K,4.txt大小4604K大于4M,形成一個(gè)2M的存儲塊后,還剩4604-1024*2=2556K,2556K大于2M,小于4M,分別形成2個(gè)1278K的存儲塊,  在存儲過程會形成6個(gè)文件塊,分別為:

576K、1151K、(1151K,1151K)、(2048K、1278K、1278K)

在切片過程中,前3個(gè)存儲塊和為576K + 1151K + 1151K = 2878K > 2M,形成一個(gè)切片;
第4和第5個(gè)存儲塊和為:1151K + 2048K = 3199K > 2M,形成一個(gè)切片;最后兩個(gè)存儲塊和為:1278K + 1278K = 2556K > 2M,形成一個(gè)切片,最終在切片過程中,4個(gè)文件形成了3個(gè)切片,啟動三個(gè)MapTask任務(wù)進(jìn)行處理文件。

Demo:采用上述D:\tmp\word\in目錄下的文件進(jìn)行測試
WordCountMapper和WordCountReducer同上例,驅(qū)動類如下

/*測試CombineTextInputFormat*/
public void testCombineTextInputFormat() throws IOException, ClassNotFoundException, InterruptedException {
	//1、獲取job的配置信息
	Configuration conf = new Configuration();
	Job job = Job.getInstance(conf);
	//2、設(shè)置jar的加載路徑
	job.setJarByClass(WordCountDriver.class);
	//3、分別設(shè)置Mapper和Reducer類
	job.setMapperClass(WordCountMapper.class);
	job.setReducerClass(WordCountReducer.class);
	//4、設(shè)置map的輸出類型
	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(IntWritable.class);
	//5、設(shè)置最終輸出的鍵值類型
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(IntWritable.class);
	//6、設(shè)置輸入輸出路徑
	FileInputFormat.setInputPaths(job, new Path("D:\\tmp\\word\\in"));
	FileOutputFormat.setOutputPath(job, new Path("D:\\tmp\\word\\out"));
	//7、設(shè)置數(shù)據(jù)切分方式
	job.setInputFormatClass(CombineTextInputFormat.class);
	CombineTextInputFormat.setMaxInputSplitSize(job, 2097152); //2M
	//8、提交任務(wù)
	boolean flag = job.waitForCompletion(true);
	System.out.println("flag ; " + flag);
}

啟動測試類,日志輸出中會有如下內(nèi)容:

number of splits:3

3、KeyValueTextInputFormat切片機(jī)制

KeyValueTextInputFormat與TextInputFormat相似,按行讀入記錄,每個(gè)文件形成一個(gè)切片,但KeyValueTextInputFormat在讀入一行后可以指定切割符,把一行內(nèi)容按切割符分割成鍵值對的形式。例如

A-this is a
B-this is b
C-this is c
C-this is c

經(jīng)過mapper階段后被切割成:

(A,this is a)
(B,this is b)
(C,this is c)
(C,this is c)

下面統(tǒng)計(jì)每行開頭為相同字母的個(gè)數(shù)。
Mapper類為:

package com.lzj.hadoop.input;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*
 * LongWritable - 表示讀取第幾行
 * Text 		-  表示讀取一行的內(nèi)容
 * Text			- 表示輸出的鍵
 * IntWritable 	- 表示輸出的鍵對應(yīng)的個(gè)數(shù)
 * */
public class WordCountMapper extends Mapper<Text, Text, Text, LongWritable>{
	
	LongWritable v = new LongWritable(1);
	
	@Override
	protected void map(Text key, Text value, Context context)
			throws IOException, InterruptedException {
		//1、讀取一行內(nèi)容
		String line = value.toString();
		if(line.isEmpty()) {
			return;
		}
		//2、按空格切割讀取的單詞
		context.write(key, v);
		
	}
	
}

Reducer類為:

package com.lzj.hadoop.input;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/*
 * Text 		-  輸入的鍵(即Mapper階段輸出的鍵)
 * IntWritable 	- 輸入的值(個(gè)數(shù))(即Mapper階段輸出的值)
 * Text 		- 輸出的鍵
 * IntWritable 	- 輸出的值
 * */
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
	@Override
	protected void reduce(Text text, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
		//1、統(tǒng)計(jì)鍵對應(yīng)的個(gè)數(shù)
		long sum = 0;
		for(LongWritable value : values) {
			sum = sum + value.get();
		}
		//2、設(shè)置reducer的輸出
		LongWritable v = new LongWritable(sum);
		context.write(text, v);
	}
}

Driver驅(qū)動類為:

/*測試keyvaleTextInputFormat*/
public static void testkeyValeTextInputFormat() throws IOException, ClassNotFoundException, InterruptedException {
	//1、獲取job的配置信息
	Configuration conf = new Configuration();
	Job job = Job.getInstance(conf);
	//2、設(shè)置jar的加載路徑
	job.setJarByClass(WordCountDriver.class);
	//3、分別設(shè)置Mapper和Reducer類
	job.setMapperClass(WordCountMapper.class);
	job.setReducerClass(WordCountReducer.class);
	//4、設(shè)置map的輸出類型
	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(LongWritable.class);
	//5、設(shè)置最終輸出的鍵值類型
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(LongWritable.class);
	//6、設(shè)置輸入輸出路徑
	FileInputFormat.setInputPaths(job, new Path("D:/tmp/word/in1/1.txt"));
	FileOutputFormat.setOutputPath(job, new Path("D:/tmp/word/out6"));
	//7、設(shè)置數(shù)據(jù)切分方式
	job.setInputFormatClass(KeyValueTextInputFormat.class);
	conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "-");
	//8、提交任務(wù)
	boolean flag = job.waitForCompletion(true);
	System.out.println("flag ; " + flag);
}

啟動測試,輸出切片個(gè)數(shù)為1

4、NLineInputFormat切片機(jī)制

NLineInputFormat可以指定切分文件時(shí)按指定的行數(shù)進(jìn)行切分,比如文件總行數(shù)為n,切分行數(shù)為N,則切片數(shù)為:如果n/N整除,切片數(shù)為n/N;如果不能整除,切片數(shù)為(n/N + 1)。以下面測試文件為例:

There is no royal road to learning
It is never too old to learn
A man becomes learned by asking questions
Absence makes the heart grow fonder
When the cat is away, the mice will play
No cross, no crown
Ill news travels fast
He that climbs high falls heavily
From saving comes having
Experience is the mother of wisdom
East or west, home is best
Don't teach your grandmother to suck eggs
Don't trouble trouble until trouble troubles you
Doing is better than saying 
Birds of a feather flock together
Barking dogs seldom bite
Bad news has wings
As the tree, so the fruit
An idle youth, a needy age

文件共有19行,假設(shè)設(shè)置切片行數(shù)為5,即每5行形成一個(gè)切片,可以分成 19/5+1=5個(gè)切片。Mapper在讀入文件時(shí)與TextInputFormat相同,按每行讀取記錄,對應(yīng)的鍵key為該行內(nèi)容在文件中的偏移量,對應(yīng)的值value為該行具體內(nèi)容。例如

(0,There is no royal road to learning)
(35,It is never too old to learn)
(64,A man becomes learned by asking questions)
	……

統(tǒng)計(jì)該測試文件中單詞數(shù)案例如下
建立Mapper類:

package com.lzj.hadoop.input.nline;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class NLineInputFormatMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

	Text k = new Text();
	LongWritable v = new LongWritable(1);
	
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		//1、獲取一行內(nèi)容
		String line = value.toString();
		//2、切割行
		String[] words = line.split(" ");
		//3、循環(huán)寫出
		for(String word : words) {
			k.set(word);
			context.write(k, v);
		}
		
	}
}

建立Reducer類:

package com.lzj.hadoop.input.nline;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class NLineInputFormatReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

	LongWritable v = new LongWritable();
	
	@Override
	protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
		long sum = 0;
		for(LongWritable value : values) {
			sum = sum + value.get();
		}
		v.set(sum);
		context.write(key, v);
	}
}

建立Driver測試類:

package com.lzj.hadoop.input.nline;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NLineInputFormatDriver {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		//1、獲取job的配置信息
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		//2、設(shè)置jar的加載路徑
		job.setJarByClass(NLineInputFormatDriver.class);
		//3、分別設(shè)置Mapper和Reducer類
		job.setMapperClass(NLineInputFormatMapper.class);
		job.setReducerClass(NLineInputFormatReducer.class);
		//4、設(shè)置map的輸出類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);
		//5、設(shè)置最終輸出的鍵值類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		//6、設(shè)置輸入輸出路徑
		FileInputFormat.setInputPaths(job, new Path("D:\\tmp\\word\\in2"));
		FileOutputFormat.setOutputPath(job, new Path("D:\\tmp\\word\\out7"));
		//7、設(shè)置切分方式
		job.setInputFormatClass(NLineInputFormat.class);
		NLineInputFormat.setNumLinesPerSplit(job, 5);
		//8、提交任務(wù)
		boolean flag = job.waitForCompletion(true);
		System.out.println("flag ; " + flag);
	}

}

啟動測試類,日志中會輸出切片的個(gè)數(shù):

number of splits:4

5、自定義InputFormat切片機(jī)制

除了上述hadoop自帶的切片機(jī)制,還可以自定義切片機(jī)制滿足定制開發(fā)。自定義InputFormat切片機(jī)制時(shí)需要自定義一個(gè)RecorderReader用于讀取文件,需要自定義一個(gè)InputFormat用于設(shè)置切文件輸入切分方式,然后后續(xù)開發(fā)如同上述切片機(jī)制開發(fā)一樣,創(chuàng)建Mapper、Reducer、driver類即可。
下面以將3個(gè)小文件合并成一個(gè)大文件為例
首先,定制RecordReader類

package com.lzj.hadoop.input.custom;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class CustomRecordReader extends RecordReader<Text, BytesWritable>{

	private FileSplit split;
	private Configuration conf;
	private Text key = new Text();
	private BytesWritable value = new BytesWritable();
	private Boolean isProgress = true;
	
	@Override
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		this.split = (FileSplit) split;
		conf = context.getConfiguration();
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		if(isProgress) {
			FSDataInputStream inputStream = null;
			try {
				/*1、獲取文件系統(tǒng)*/
				Path path = split.getPath();
				FileSystem fs = path.getFileSystem(conf);
				/*2、獲取數(shù)據(jù)輸入流*/
				inputStream = fs.open(path);
				/*3、讀取文件內(nèi)容*/
				byte[] buf = new byte[(int) split.getLength()];
				IOUtils.readFully(inputStream, buf, 0, buf.length);
				/*4、設(shè)置輸出文件內(nèi)容value*/
				value.set(buf, 0, buf.length);
				/*5、設(shè)置輸出文件的key*/
				String fileName = split.getPath().toString();
				key.set(fileName);
			} catch (Exception e) {
				e.printStackTrace();
			}finally {
				/*6、關(guān)閉數(shù)據(jù)流*/
				IOUtils.closeStream(inputStream);
			}
			isProgress = false;
			return true;
		}
		return false;
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	@Override
	public BytesWritable getCurrentValue() throws IOException, InterruptedException {
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		return 0;
	}

	@Override
	public void close() throws IOException {
		// TODO Auto-generated method stub
		
	}

}

其次,定制FileInputFormat

package com.lzj.hadoop.input.custom;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class CustomFileInputFormat extends FileInputFormat<Text, BytesWritable>{

	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		return false;
	}
	
	@Override
	public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		CustomRecordReader recorder = new CustomRecordReader();
		recorder.initialize(split, context);
		return recorder;
	}

}

然后創(chuàng)建Mapper類

package com.lzj.hadoop.input.custom;

import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class CstomMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{
	@Override
	protected void map(Text key, BytesWritable value, Mapper<Text, BytesWritable, Text, BytesWritable>.Context context)
			throws IOException, InterruptedException {
		//把文件名作為key,文件內(nèi)容作為value
		context.write(key, value);
	}
}

再然后,創(chuàng)建Reducer類:

package com.lzj.hadoop.input.custom;

import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class CustomReducer extends Reducer<Text, BytesWritable, Text, BytesWritable>{

	@Override
	protected void reduce(Text key, Iterable<BytesWritable> values,
			Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
		/*把key(文件名)+ value(文件內(nèi)容)寫入到一個(gè)文件中*/
		context.write(key, values.iterator().next());
	}
}

最后,創(chuàng)建Driver驅(qū)動類:

package com.lzj.hadoop.input.custom;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class CustomDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		//1、獲取job的配置信息
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		//2、設(shè)置jar的加載路徑
		job.setJarByClass(CustomDriver.class);
		//3、分別設(shè)置Mapper和Reducer類
		job.setMapperClass(CstomMapper.class);
		job.setReducerClass(CustomReducer.class);
		//4、設(shè)置map的輸出類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(BytesWritable.class);
		//5、設(shè)置最終輸出的鍵值類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(BytesWritable.class);
		//6、設(shè)置輸入文件格式
		job.setInputFormatClass(CustomFileInputFormat.class);
		//7、設(shè)置輸出文件格式
		job.setOutputFormatClass(SequenceFileOutputFormat.class);
		//6、設(shè)置輸入輸出路徑
		FileInputFormat.setInputPaths(job, new Path("D:/tmp/word/in3"));
		FileOutputFormat.setOutputPath(job, new Path("D:/tmp/word/out7"));
		//8、提交任務(wù)
		boolean flag = job.waitForCompletion(true);
		System.out.println("flag ; " + flag);
	}
}

運(yùn)行驅(qū)動類,會在out7目錄下生成一個(gè)part-r-00000文件,打開之后,發(fā)現(xiàn)把in3目錄下的1.txt、2.txt、3.txt的文件和內(nèi)容寫入到了該文件中,以后直接讀取該文件,通過key(文件名)就可以直接獲取小文件的內(nèi)容。

到此這篇關(guān)于hadoop 切片機(jī)制分析與應(yīng)用的文章就介紹到這了,更多相關(guān)hadoop 切片機(jī)制內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論