Hadoop對文本文件的快速全局排序實現(xiàn)方法及分析
一、背景
Hadoop中實現(xiàn)了用于全局排序的InputSampler類和TotalOrderPartitioner類,調用示例是org.apache.hadoop.examples.Sort。
但是當我們以Text文件作為輸入時,結果并非按Text中的string列排序,而且輸出結果是SequenceFile。
原因:
1) hadoop在處理Text文件時,key是行號LongWritable類型,InputSampler抽樣的是key,TotalOrderPartitioner也是用key去查找分區(qū)。這樣,抽樣得到的partition文件是對行號的抽樣,結果自然是根據(jù)行號來排序。
2)大數(shù)據(jù)量時,InputSampler抽樣速度會非常慢。比如,RandomSampler需要遍歷所有數(shù)據(jù),IntervalSampler需要遍歷文件數(shù)與splits數(shù)一樣。SplitSampler效率比較高,但它只抽取每個文件前面的記錄,不適合應用于文件內有序的情況。
二、功能
1. 實現(xiàn)了一種局部抽樣方法PartialSampler,適用于輸入數(shù)據(jù)各文件是獨立同分布的情況
2. 使RandomSampler、IntervalSampler、SplitSampler支持對文本的抽樣
3. 實現(xiàn)了針對Text文件string列的TotalOrderPartitioner
三、實現(xiàn)
1. PartialSampler
PartialSampler從第一份輸入數(shù)據(jù)中隨機抽取第一列文本數(shù)據(jù)。PartialSampler有兩個屬性:freq(采樣頻率),numSamples(采樣總數(shù))。
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.debug("seed: " + seed);
// 對splits【0】抽樣
for (int i = 0; i < 1; i++) {
System.out.println("PartialSampler will getSample splits["+i+"]");
RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
// 選擇value中的第一列抽樣
Text value0 = new Text(value.toString().split("\t")[0]);
samples.add((K) value0);
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
// to reflect the possibility of existing elements being
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
Text value0 = new Text(value.toString().split("\t")[0]);
samples.set(ind, (K) value0);
}
freq *= (numSamples - 1) / (double) numSamples;
}
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
首先通過InputFormat的getSplits方法得到所有的輸入分區(qū);
然后掃描第一個分區(qū)中的記錄進行采樣。
記錄采樣的具體過程如下:
從指定分區(qū)中取出一條記錄,判斷得到的隨機浮點數(shù)是否小于等于采樣頻率freq
如果大于則放棄這條記錄;
如果小于,則判斷當前的采樣數(shù)是否小于最大采樣數(shù),
如果小于則這條記錄被選中,被放進采樣集合中;
否則從【0,numSamples】中選擇一個隨機數(shù),如果這個隨機數(shù)不等于最大采樣數(shù)numSamples,則用這條記錄替換掉采樣集合隨機數(shù)對應位置的記錄,同時采樣頻率freq減小變?yōu)閒req*(numSamples-1)/numSamples。
然后依次遍歷分區(qū)中的其它記錄。
note:
1)PartialSampler只適用于輸入數(shù)據(jù)各文件是獨立同分布的情況。
2)自帶的三種Sampler通過修改samples.add(key)為samples.add((K) value0); 也可以實現(xiàn)對第一列的抽樣。
2. TotalOrderPartitioner
TotalOrderPartitioner主要改進了兩點:
1)讀partition時指定keyClass為Text.class
因為partition文件中的key類型為Text
在configure函數(shù)中,修改:
//Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass(); Class<K> keyClass = (Class<K>)Text.class;
2)查找分區(qū)時,改用value查
public int getPartition(K key, V value, int numPartitions) {
Text value0 = new Text(value.toString().split("\t")[0]);
return partitions.findPartition((K) value0);
}
3. Sort
1)設置InputFormat、OutputFormat、OutputKeyClass、OutputValueClass、MapOutputKeyClass
2)初始化InputSampler對象,抽樣
3)partitionFile通過CacheFile傳給TotalOrderPartitioner,執(zhí)行MapReduce任務
Class<? extends InputFormat> inputFormatClass = TextInputFormat.class;
Class<? extends OutputFormat> outputFormatClass = TextOutputFormat.class;
Class<? extends WritableComparable> outputKeyClass = Text.class;
Class<? extends Writable> outputValueClass = Text.class;
jobConf.setMapOutputKeyClass(LongWritable.class);
// Set user-supplied (possibly default) job configs
jobConf.setNumReduceTasks(num_reduces);
jobConf.setInputFormat(inputFormatClass);
jobConf.setOutputFormat(outputFormatClass);
jobConf.setOutputKeyClass(outputKeyClass);
jobConf.setOutputValueClass(outputValueClass);
if (sampler != null) {
System.out.println("Sampling input to effect total-order sort...");
jobConf.setPartitionerClass(TotalOrderPartitioner.class);
Path inputDir = FileInputFormat.getInputPaths(jobConf)[0];
inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf));
//Path partitionFile = new Path(inputDir, "_sortPartitioning");
TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);
InputSampler.<K,V>writePartitionFile(jobConf, sampler);
URI partitionUri = new URI(partitionFile.toString() + "#" + "_sortPartitioning");
DistributedCache.addCacheFile(partitionUri, jobConf);
DistributedCache.createSymlink(jobConf);
}
FileSystem hdfs = FileSystem.get(jobConf);
hdfs.delete(outputpath);
hdfs.close();
System.out.println("Running on " +
cluster.getTaskTrackers() +
" nodes to sort from " +
FileInputFormat.getInputPaths(jobConf)[0] + " into " +
FileOutputFormat.getOutputPath(jobConf) +
" with " + num_reduces + " reduces.");
Date startTime = new Date();
System.out.println("Job started: " + startTime);
jobResult = JobClient.runJob(jobConf);
四、執(zhí)行
usage:
hadoop jar yitengfei.jar com.yitengfei.Sort [-m <maps>] [-r <reduces>]
[-splitRandom <double pcnt> <numSamples> <maxsplits> | // Sample from random splits at random (general)
-splitSample <numSamples> <maxsplits> | // Sample from first records in splits (random data)
-splitInterval <double pcnt> <maxsplits>] // Sample from splits at intervals (sorted data)
-splitPartial <double pcnt> <numSamples> <maxsplits> | // Sample from partial splits at random (general) ]
<input> <output> <partitionfile>
Example:
hadoop jar yitengfei.jar com.yitengfei.Sort -r 10 -splitPartial 0.1 10000 10 /user/rp-rd/yitengfei/sample/input /user/rp-rd/yitengfei/sample/output /user/rp-rd/yitengfei/sample/partition
五、性能
200G輸入數(shù)據(jù),15億條url,1000個分區(qū),排序時間只用了6分鐘
總結
以上就是本文關于Hadoop對文本文件的快速全局排序實現(xiàn)方法及分析的全部內容,希望對大家有所幫助 ,感興趣的朋友可以繼續(xù)參閱本站:hadoop重新格式化HDFS步驟解析、淺談七種常見的Hadoop和Spark項目案例。如有不足之處,歡迎留言指出,感謝朋友們對本站的支持!
相關文章
搭建Consul服務發(fā)現(xiàn)與服務網(wǎng)格
這篇文章介紹了搭建Consul服務發(fā)現(xiàn)與服務網(wǎng)格的方法,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-04-04
Hadoop對文本文件的快速全局排序實現(xiàn)方法及分析
這篇文章主要介紹了Hadoop對文本文件的快速全局排序實現(xiàn)方法及分析,小編覺得挺不錯的,這里分享給大家,供需要的朋友參考。2017-10-10

