MapTask階段shuffle源碼分析
1. 收集階段
在Mapper
中,調用context.write(key,value)
實際是調用代理NewOutPutCollector
的wirte
方法
public void write(KEYOUT key, VALUEOUT value ) throws IOException, InterruptedException { output.write(key, value); }
實際調用的是MapOutPutBuffer
的collect()
,在進行收集前,調用partitioner來計算每個key-value的分區(qū)號
@Override public void write(K key, V value) throws IOException, InterruptedException { collector.collect(key, value, partitioner.getPartition(key, value, partitions)); }
2. NewOutPutCollector對象的創(chuàng)建
@SuppressWarnings("unchecked") NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { // 創(chuàng)建實際用來收集key-value的緩存區(qū)對象 collector = createSortingCollector(job, reporter); // 獲取總的分區(qū)個數(shù) partitions = jobContext.getNumReduceTasks(); if (partitions > 1) { partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { // 默認情況,直接創(chuàng)建一個匿名內部類,所有的key-value都分配到0號分區(qū) partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() { @Override public int getPartition(K key, V value, int numPartitions) { return partitions - 1; } }; } }
3. 創(chuàng)建環(huán)形緩沖區(qū)對象
@SuppressWarnings("unchecked") private <KEY, VALUE> MapOutputCollector<KEY, VALUE> createSortingCollector(JobConf job, TaskReporter reporter) throws IOException, ClassNotFoundException { MapOutputCollector.Context context = new MapOutputCollector.Context(this, job, reporter); // 從當前Job的配置中,獲取mapreduce.job.map.output.collector.class,如果沒有設置,使用MapOutputBuffer.class Class<?>[] collectorClasses = job.getClasses( JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class); int remainingCollectors = collectorClasses.length; Exception lastException = null; for (Class clazz : collectorClasses) { try { if (!MapOutputCollector.class.isAssignableFrom(clazz)) { throw new IOException("Invalid output collector class: " + clazz.getName() + " (does not implement MapOutputCollector)"); } Class<? extends MapOutputCollector> subclazz = clazz.asSubclass(MapOutputCollector.class); LOG.debug("Trying map output collector class: " + subclazz.getName()); // 創(chuàng)建緩沖區(qū)對象 MapOutputCollector<KEY, VALUE> collector = ReflectionUtils.newInstance(subclazz, job); // 創(chuàng)建完緩沖區(qū)對象后,執(zhí)行初始化 collector.init(context); LOG.info("Map output collector class = " + collector.getClass().getName()); return collector; } catch (Exception e) { String msg = "Unable to initialize MapOutputCollector " + clazz.getName(); if (--remainingCollectors > 0) { msg += " (" + remainingCollectors + " more collector(s) to try)"; } lastException = e; LOG.warn(msg, e); } } throw new IOException("Initialization of all the collectors failed. " + "Error in last collector was :" + lastException.getMessage(), lastException); }
3. MapOutPutBuffer的初始化 環(huán)形緩沖區(qū)對象
@SuppressWarnings("unchecked") public void init(MapOutputCollector.Context context ) throws IOException, ClassNotFoundException { job = context.getJobConf(); reporter = context.getReporter(); mapTask = context.getMapTask(); mapOutputFile = mapTask.getMapOutputFile(); sortPhase = mapTask.getSortPhase(); spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS); // 獲取分區(qū)總個數(shù),取決于ReduceTask的數(shù)量 partitions = job.getNumReduceTasks(); rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw(); //sanity checks // 從當前配置中,獲取mapreduce.map.sort.spill.percent,如果沒有設置,就是0.8 final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); // 獲取mapreduce.task.io.sort.mb,如果沒設置,就是100MB final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, INDEX_CACHE_MEMORY_LIMIT_DEFAULT); if (spillper > (float)1.0 || spillper <= (float)0.0) { throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT + "\": " + spillper); } if ((sortmb & 0x7FF) != sortmb) { throw new IOException( "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb); } // 在溢寫前,對key-value排序,采用的排序器,使用快速排序,只排索引 sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job); // buffers and accounting int maxMemUsage = sortmb << 20; maxMemUsage -= maxMemUsage % METASIZE; // 存放key-value kvbuffer = new byte[maxMemUsage]; bufvoid = kvbuffer.length; // 存儲key-value的屬性信息,分區(qū)號,索引等 kvmeta = ByteBuffer.wrap(kvbuffer) .order(ByteOrder.nativeOrder()) .asIntBuffer(); setEquator(0); bufstart = bufend = bufindex = equator; kvstart = kvend = kvindex; maxRec = kvmeta.capacity() / NMETA; softLimit = (int)(kvbuffer.length * spillper); bufferRemaining = softLimit; LOG.info(JobContext.IO_SORT_MB + ": " + sortmb); LOG.info("soft limit at " + softLimit); LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid); LOG.info("kvstart = " + kvstart + "; length = " + maxRec); // k/v serialization // 獲取快速排序的Key的比較器,排序只按照key進行排序! comparator = job.getOutputKeyComparator(); // 獲取key-value的序列化器 keyClass = (Class<K>)job.getMapOutputKeyClass(); valClass = (Class<V>)job.getMapOutputValueClass(); serializationFactory = new SerializationFactory(job); keySerializer = serializationFactory.getSerializer(keyClass); keySerializer.open(bb); valSerializer = serializationFactory.getSerializer(valClass); valSerializer.open(bb); // output counters mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES); mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); fileOutputByteCounter = reporter .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES); // 溢寫到磁盤,可以使用一個壓縮格式! 獲取指定的壓縮編解碼器 // compression if (job.getCompressMapOutput()) { Class<? extends CompressionCodec> codecClass = job.getMapOutputCompressorClass(DefaultCodec.class); codec = ReflectionUtils.newInstance(codecClass, job); } else { codec = null; } // 獲取Combiner組件 // combiner final Counters.Counter combineInputCounter = reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS); combinerRunner = CombinerRunner.create(job, getTaskID(), combineInputCounter, reporter, null); if (combinerRunner != null) { final Counters.Counter combineOutputCounter = reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job); } else { combineCollector = null; } spillInProgress = false; minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3); // 設置溢寫線程在后臺運行,溢寫是在后臺運行另外一個溢寫線程!和收集是兩個線程! spillThread.setDaemon(true); spillThread.setName("SpillThread"); spillLock.lock(); try { // 啟動線程 spillThread.start(); while (!spillThreadRunning) { spillDone.await(); } } catch (InterruptedException e) { throw new IOException("Spill thread failed to initialize", e); } finally { spillLock.unlock(); } if (sortSpillException != null) { throw new IOException("Spill thread failed to initialize", sortSpillException); } }
4. Paritionner的獲取
從配置中讀取mapreduce.job.partitioner.class
,如果沒有指定,采用HashPartitioner.class
如果reduceTask > 1, 還沒有設置分區(qū)組件,使用HashPartitioner
@SuppressWarnings("unchecked") public Class<? extends Partitioner<?,?>> getPartitionerClass() throws ClassNotFoundException { return (Class<? extends Partitioner<?,?>>) conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class); }
public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. **/ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
分區(qū)號的限制:0 <= 分區(qū)號 < 總的分區(qū)數(shù)(reduceTask的個數(shù))
if (partition < 0 || partition >= partitions) { throw new IOException("Illegal partition for " + key + " (" + partition + ")"); }
5.MapTask shuffle的流程
①在map()調用context.write()
②調用MapoutPutBuffer的collect()
- 調用分區(qū)組件Partitionner計算當前這組key-value的分區(qū)號
③將當前key-value收集到MapOutPutBuffer中
- 如果超過溢寫的閥值,在后臺啟動溢寫線程,來進行溢寫!
④溢寫前,先根據(jù)分區(qū)號,將相同分區(qū)號的key-value,采用快速排序算法,進行排序!
- 排序并不在內存中移動key-value,而是記錄排序后key-value的有序索引!
⑤ 開始溢寫,按照排序后有序的索引,將文件寫入到一個臨時的溢寫文件中
- 如果沒有定義Combiner,直接溢寫!
- 如果定義了Combiner,使用CombinerRunner.conbine()對key-value處理后再次溢寫!
⑥多次溢寫后,每次溢寫都會產生一個臨時文件
⑦最后,執(zhí)行一次flush(),將剩余的key-value進行溢寫
⑧MergeParts: 將多次溢寫的結果,保存為一個總的文件!
- 在合并為一個總的文件前,會執(zhí)行歸并排序,保證合并后的文件,各個分區(qū)也是有序的!
- 如果定義了Conbiner,Conbiner會再次運行(前提是溢寫的文件個數(shù)大于3)!
- 否則,就直接溢寫!
⑨最終保證生成一個最終的文件,這個文件根據(jù)總區(qū)號,分為若干部分,每個部分的key-value都已經(jīng)排好序,等待ReduceTask來拷貝相應分區(qū)的數(shù)據(jù)
6. Combiner
combiner其實就是Reducer類型:
Class<? extends Reducer<K,V,K,V>> cls = (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
Combiner的運行時機:
MapTask:
- ①每次溢寫前,如果指定了Combiner,會運行
- ②將多個溢寫片段,進行合并為一個最終的文件時,也會運行Combiner,前提是片段數(shù)>=3
ReduceTask:
③reduceTask在運行時,需要啟動shuffle進程拷貝MapTask產生的數(shù)據(jù)!
- 數(shù)據(jù)在copy后,進入shuffle工作的內存,在內存中進行merge和sort!
- 數(shù)據(jù)過多,內部不夠,將部分數(shù)據(jù)溢寫在磁盤!
- 如果有溢寫的過程,那么combiner會再次運行!
①一定會運行,②,③需要條件!
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對腳本之家的支持。如果你想了解更多相關內容請查看下面相關鏈接
相關文章
Tomcat?8.5?+mysql?5.7+jdk1.8開發(fā)JavaSE的金牌榜小項目
這篇文章主要介紹了Tomcat?8.5?+mysql?5.7+jdk1.8開發(fā)JavaSE的金牌榜小項目,本文通過圖文實例相結合給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-05-05IntelliJ IDEA自定義代碼提示模板Live Templates的圖文教程
這篇文章主要介紹了IntelliJ IDEA自定義代碼提示模板Live Templates,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-03-03java連接SQL?Server數(shù)據(jù)庫的超詳細教程
最近在java連接SQL數(shù)據(jù)庫時會出現(xiàn)一些問題,所以這篇文章主要給大家介紹了關于java連接SQL?Server數(shù)據(jù)庫的超詳細教程,文中通過圖文介紹的非常詳細,需要的朋友可以參考下2022-06-06Spring MVC數(shù)據(jù)綁定概述及原理詳解
這篇文章主要介紹了Spring MVC數(shù)據(jù)綁定概述及原理詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-06-06Java SimpleDateFormat中英文時間格式化轉換詳解
這篇文章主要為大家詳細介紹了Java SimpleDateFormat中英文時間格式化轉換,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-12-12