欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

MapTask階段shuffle源碼分析

 更新時間:2019年01月10日 09:57:37   作者:qq_43193797  
今天小編就為大家分享一篇關于MapTask階段shuffle源碼分析,小編覺得內容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧

1. 收集階段

Mapper中,調用context.write(key,value)實際是調用代理NewOutPutCollectorwirte方法

public void write(KEYOUT key, VALUEOUT value
          ) throws IOException, InterruptedException {
  output.write(key, value);
 }

實際調用的是MapOutPutBuffercollect(),在進行收集前,調用partitioner來計算每個key-value的分區(qū)號

@Override
  public void write(K key, V value) throws IOException, InterruptedException {
   collector.collect(key, value,
            partitioner.getPartition(key, value, partitions));
  }

2. NewOutPutCollector對象的創(chuàng)建

@SuppressWarnings("unchecked")
  NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
            JobConf job,
            TaskUmbilicalProtocol umbilical,
            TaskReporter reporter
            ) throws IOException, ClassNotFoundException {
  // 創(chuàng)建實際用來收集key-value的緩存區(qū)對象
   collector = createSortingCollector(job, reporter);
  // 獲取總的分區(qū)個數(shù)
   partitions = jobContext.getNumReduceTasks();
   if (partitions > 1) {
    partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
     ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
   } else {
    // 默認情況,直接創(chuàng)建一個匿名內部類,所有的key-value都分配到0號分區(qū)
    partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
     @Override
     public int getPartition(K key, V value, int numPartitions) {
      return partitions - 1;
     }
    };
   }
  }

3. 創(chuàng)建環(huán)形緩沖區(qū)對象

@SuppressWarnings("unchecked")
 private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
     createSortingCollector(JobConf job, TaskReporter reporter)
  throws IOException, ClassNotFoundException {
  MapOutputCollector.Context context =
   new MapOutputCollector.Context(this, job, reporter);
  // 從當前Job的配置中,獲取mapreduce.job.map.output.collector.class,如果沒有設置,使用MapOutputBuffer.class
  Class<?>[] collectorClasses = job.getClasses(
   JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
  int remainingCollectors = collectorClasses.length;
  Exception lastException = null;
  for (Class clazz : collectorClasses) {
   try {
    if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
     throw new IOException("Invalid output collector class: " + clazz.getName() +
      " (does not implement MapOutputCollector)");
    }
    Class<? extends MapOutputCollector> subclazz =
     clazz.asSubclass(MapOutputCollector.class);
    LOG.debug("Trying map output collector class: " + subclazz.getName());
   // 創(chuàng)建緩沖區(qū)對象
    MapOutputCollector<KEY, VALUE> collector =
     ReflectionUtils.newInstance(subclazz, job);
   // 創(chuàng)建完緩沖區(qū)對象后,執(zhí)行初始化
    collector.init(context);
    LOG.info("Map output collector class = " + collector.getClass().getName());
    return collector;
   } catch (Exception e) {
    String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
    if (--remainingCollectors > 0) {
     msg += " (" + remainingCollectors + " more collector(s) to try)";
    }
    lastException = e;
    LOG.warn(msg, e);
   }
  }
  throw new IOException("Initialization of all the collectors failed. " +
   "Error in last collector was :" + lastException.getMessage(), lastException);
 }

3. MapOutPutBuffer的初始化   環(huán)形緩沖區(qū)對象

@SuppressWarnings("unchecked")
  public void init(MapOutputCollector.Context context
          ) throws IOException, ClassNotFoundException {
   job = context.getJobConf();
   reporter = context.getReporter();
   mapTask = context.getMapTask();
   mapOutputFile = mapTask.getMapOutputFile();
   sortPhase = mapTask.getSortPhase();
   spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
   // 獲取分區(qū)總個數(shù),取決于ReduceTask的數(shù)量
   partitions = job.getNumReduceTasks();
   rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
   //sanity checks
   // 從當前配置中,獲取mapreduce.map.sort.spill.percent,如果沒有設置,就是0.8
   final float spillper =
    job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
   // 獲取mapreduce.task.io.sort.mb,如果沒設置,就是100MB
   final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
   indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                     INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
   if (spillper > (float)1.0 || spillper <= (float)0.0) {
    throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
      "\": " + spillper);
   }
   if ((sortmb & 0x7FF) != sortmb) {
    throw new IOException(
      "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
   }
// 在溢寫前,對key-value排序,采用的排序器,使用快速排序,只排索引
   sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
      QuickSort.class, IndexedSorter.class), job);
   // buffers and accounting
   int maxMemUsage = sortmb << 20;
   maxMemUsage -= maxMemUsage % METASIZE;
   // 存放key-value
   kvbuffer = new byte[maxMemUsage];
   bufvoid = kvbuffer.length;
  // 存儲key-value的屬性信息,分區(qū)號,索引等
   kvmeta = ByteBuffer.wrap(kvbuffer)
     .order(ByteOrder.nativeOrder())
     .asIntBuffer();
   setEquator(0);
   bufstart = bufend = bufindex = equator;
   kvstart = kvend = kvindex;
   maxRec = kvmeta.capacity() / NMETA;
   softLimit = (int)(kvbuffer.length * spillper);
   bufferRemaining = softLimit;
   LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
   LOG.info("soft limit at " + softLimit);
   LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
   LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
   // k/v serialization
    // 獲取快速排序的Key的比較器,排序只按照key進行排序!
   comparator = job.getOutputKeyComparator();
  // 獲取key-value的序列化器
   keyClass = (Class<K>)job.getMapOutputKeyClass();
   valClass = (Class<V>)job.getMapOutputValueClass();
   serializationFactory = new SerializationFactory(job);
   keySerializer = serializationFactory.getSerializer(keyClass);
   keySerializer.open(bb);
   valSerializer = serializationFactory.getSerializer(valClass);
   valSerializer.open(bb);
   // output counters
   mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
   mapOutputRecordCounter =
    reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
   fileOutputByteCounter = reporter
     .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
   // 溢寫到磁盤,可以使用一個壓縮格式! 獲取指定的壓縮編解碼器
   // compression
   if (job.getCompressMapOutput()) {
    Class<? extends CompressionCodec> codecClass =
     job.getMapOutputCompressorClass(DefaultCodec.class);
    codec = ReflectionUtils.newInstance(codecClass, job);
   } else {
    codec = null;
   }
   // 獲取Combiner組件
   // combiner
   final Counters.Counter combineInputCounter =
    reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
   combinerRunner = CombinerRunner.create(job, getTaskID(),
                       combineInputCounter,
                       reporter, null);
   if (combinerRunner != null) {
    final Counters.Counter combineOutputCounter =
     reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
    combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
   } else {
    combineCollector = null;
   }
   spillInProgress = false;
   minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
   // 設置溢寫線程在后臺運行,溢寫是在后臺運行另外一個溢寫線程!和收集是兩個線程!
   spillThread.setDaemon(true);
   spillThread.setName("SpillThread");
   spillLock.lock();
   try {
   // 啟動線程
    spillThread.start();
    while (!spillThreadRunning) {
     spillDone.await();
    }
   } catch (InterruptedException e) {
    throw new IOException("Spill thread failed to initialize", e);
   } finally {
    spillLock.unlock();
   }
   if (sortSpillException != null) {
    throw new IOException("Spill thread failed to initialize",
      sortSpillException);
   }
  }

4. Paritionner的獲取

從配置中讀取mapreduce.job.partitioner.class,如果沒有指定,采用HashPartitioner.class

如果reduceTask > 1, 還沒有設置分區(qū)組件,使用HashPartitioner

@SuppressWarnings("unchecked")
 public Class<? extends Partitioner<?,?>> getPartitionerClass()
   throws ClassNotFoundException {
  return (Class<? extends Partitioner<?,?>>)
   conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
 }
public class HashPartitioner<K, V> extends Partitioner<K, V> {
 /** Use {@link Object#hashCode()} to partition. **/
 public int getPartition(K key, V value,
             int numReduceTasks) {
  return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
 }
}

分區(qū)號的限制:0 <= 分區(qū)號 < 總的分區(qū)數(shù)(reduceTask的個數(shù))

if (partition < 0 || partition >= partitions) {
    throw new IOException("Illegal partition for " + key + " (" +
      partition + ")");
   }

5.MapTask shuffle的流程

              ①在map()調用context.write()

              ②調用MapoutPutBuffer的collect()

  •                             調用分區(qū)組件Partitionner計算當前這組key-value的分區(qū)號

              ③將當前key-value收集到MapOutPutBuffer中

  •                             如果超過溢寫的閥值,在后臺啟動溢寫線程,來進行溢寫!

              ④溢寫前,先根據(jù)分區(qū)號,將相同分區(qū)號的key-value,采用快速排序算法,進行排序!

  •                             排序并不在內存中移動key-value,而是記錄排序后key-value的有序索引!

              ⑤ 開始溢寫,按照排序后有序的索引,將文件寫入到一個臨時的溢寫文件中

  •                             如果沒有定義Combiner,直接溢寫!
  •                             如果定義了Combiner,使用CombinerRunner.conbine()對key-value處理后再次溢寫!

              ⑥多次溢寫后,每次溢寫都會產生一個臨時文件

              ⑦最后,執(zhí)行一次flush(),將剩余的key-value進行溢寫

              ⑧MergeParts: 將多次溢寫的結果,保存為一個總的文件!

  •                      在合并為一個總的文件前,會執(zhí)行歸并排序,保證合并后的文件,各個分區(qū)也是有序的!
  •                      如果定義了Conbiner,Conbiner會再次運行(前提是溢寫的文件個數(shù)大于3)!
  •                      否則,就直接溢寫!

              ⑨最終保證生成一個最終的文件,這個文件根據(jù)總區(qū)號,分為若干部分,每個部分的key-value都已經(jīng)排好序,等待ReduceTask來拷貝相應分區(qū)的數(shù)據(jù)

6. Combiner

combiner其實就是Reducer類型:

Class<? extends Reducer<K,V,K,V>> cls =
    (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();

Combiner的運行時機:

MapTask:

  •               ①每次溢寫前,如果指定了Combiner,會運行
  •               ②將多個溢寫片段,進行合并為一個最終的文件時,也會運行Combiner,前提是片段數(shù)>=3

ReduceTask:

              ③reduceTask在運行時,需要啟動shuffle進程拷貝MapTask產生的數(shù)據(jù)!

  •                      數(shù)據(jù)在copy后,進入shuffle工作的內存,在內存中進行merge和sort!
  •                      數(shù)據(jù)過多,內部不夠,將部分數(shù)據(jù)溢寫在磁盤!
  •                      如果有溢寫的過程,那么combiner會再次運行!

①一定會運行,②,③需要條件!

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對腳本之家的支持。如果你想了解更多相關內容請查看下面相關鏈接

相關文章

  • Tomcat?8.5?+mysql?5.7+jdk1.8開發(fā)JavaSE的金牌榜小項目

    Tomcat?8.5?+mysql?5.7+jdk1.8開發(fā)JavaSE的金牌榜小項目

    這篇文章主要介紹了Tomcat?8.5?+mysql?5.7+jdk1.8開發(fā)JavaSE的金牌榜小項目,本文通過圖文實例相結合給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-05-05
  • Java泛型通配符的使用詳解

    Java泛型通配符的使用詳解

    本文主要介紹了Java泛型通配符的使用詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-01-01
  • 詳解如何給SpringBoot部署的jar包瘦身

    詳解如何給SpringBoot部署的jar包瘦身

    這篇文章主要介紹了如何給SpringBoot部署的jar包瘦身,如今迭代發(fā)布是常有的事情,每次都上傳一個如此龐大的文件,會浪費很多時間,接下來小編就以一個小項目為例,來演示如何給jar包瘦身,需要的朋友可以參考下
    2023-07-07
  • spring?boot微服務場景下apollo加載過程解析

    spring?boot微服務場景下apollo加載過程解析

    apollo?是一個開源的配置中心項目,功能很強大,apollo?本身的配置項并不復雜,但是因為配置的路徑特別多,非常容易搞混了,?所以本文試圖聚焦?spring-boot?的場景,在?spring-boot?微服務場景下,搞清楚?apollo-client的加載過程
    2022-02-02
  • java線程中start和run的區(qū)別詳解

    java線程中start和run的區(qū)別詳解

    這篇文章主要介紹了java線程中start和run的區(qū)別詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-10-10
  • IntelliJ IDEA自定義代碼提示模板Live Templates的圖文教程

    IntelliJ IDEA自定義代碼提示模板Live Templates的圖文教程

    這篇文章主要介紹了IntelliJ IDEA自定義代碼提示模板Live Templates,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-03-03
  • java連接SQL?Server數(shù)據(jù)庫的超詳細教程

    java連接SQL?Server數(shù)據(jù)庫的超詳細教程

    最近在java連接SQL數(shù)據(jù)庫時會出現(xiàn)一些問題,所以這篇文章主要給大家介紹了關于java連接SQL?Server數(shù)據(jù)庫的超詳細教程,文中通過圖文介紹的非常詳細,需要的朋友可以參考下
    2022-06-06
  • 基于html5+java實現(xiàn)大文件上傳實例代碼

    基于html5+java實現(xiàn)大文件上傳實例代碼

    本文通過一段實例代碼給大家介紹基于html5+java實現(xiàn)大文件上傳,涉及到html5 java 文件上傳相關知識,感興趣的朋友一起學習吧
    2016-01-01
  • Spring MVC數(shù)據(jù)綁定概述及原理詳解

    Spring MVC數(shù)據(jù)綁定概述及原理詳解

    這篇文章主要介紹了Spring MVC數(shù)據(jù)綁定概述及原理詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-06-06
  • Java SimpleDateFormat中英文時間格式化轉換詳解

    Java SimpleDateFormat中英文時間格式化轉換詳解

    這篇文章主要為大家詳細介紹了Java SimpleDateFormat中英文時間格式化轉換,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-12-12

最新評論