Java基礎之MapReduce框架總結與擴展知識點
一、MapTask工作機制
MapTask就是Map階段的job,它的數(shù)量由切片決定

二、MapTask工作流程:
1.Read階段:讀取文件,此時進行對文件數(shù)據(jù)進行切片(InputFormat進行切片),通過切片,從而確定MapTask的數(shù)量,切片中包含數(shù)據(jù)和key(偏移量)
2.Map階段:這個階段是針對數(shù)據(jù)進行map方法的計算操作,通過該方法,可以對切片中的key和value進行處理
3.Collect收集階段:在用戶編寫map()函數(shù)中,當數(shù)據(jù)處理完成后,一般會調(diào)用OutputCollector.collect()輸出結果。在該函數(shù)內(nèi)部,它會將生成的key/value分區(qū)(調(diào)用Partitioner),并寫入一個環(huán)形內(nèi)存緩沖區(qū)中。
4.Spill階段:即“溢寫”,當環(huán)形緩沖區(qū)滿后,MapReduce會將數(shù)據(jù)寫到本地磁盤上,生成一個臨時文件。需要注意的是,將數(shù)據(jù)寫入本地磁盤之前,先要對數(shù)據(jù)進行一次本地排序,并在必要時對數(shù)據(jù)進行合并、壓縮等操作。
5.Combine階段:當所有數(shù)據(jù)處理完成后,MapTask對所有臨時文件進行一次合并,以確保最終只會生成一個數(shù)據(jù)文件,這個階段默認是沒有的,一般需要我們自定義
6.當所有數(shù)據(jù)處理完后,MapTask會將所有臨時文件合并成一個大文件,并保存到文件output/file.out中,同時生成相應的索引文件output/file.out.index。
7.在進行文件合并過程中,MapTask以分區(qū)為單位進行合并。對于某個分區(qū),它將采用多輪遞歸合并的方式。每輪合并io.sort.factor(默認10)個文件,并將產(chǎn)生的文件重新加入待合并列表中,對文件排序后,重復以上過程,直到最終得到一個大文件。
8.讓每個MapTask最終只生成一個數(shù)據(jù)文件,可避免同時打開大量文件和同時讀取大量小文件產(chǎn)生的隨機讀取帶來的開銷
第四步溢寫階段詳情:
- 步驟1:利用快速排序算法對緩存區(qū)內(nèi)的數(shù)據(jù)進行排序,排序方式是,先按照分區(qū)編號Partition進行排序,然后按照key進行排序。這樣,經(jīng)過排序后,數(shù)據(jù)以分區(qū)為單位聚集在一起,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照key有序。
- 步驟2:按照分區(qū)編號由小到大依次將每個分區(qū)中的數(shù)據(jù)寫入任務工作目錄下的臨時文件output/spillN.out(N表示當前溢寫次數(shù))中。如果用戶設置了Combiner,則寫入文件之前,對每個分區(qū)中的數(shù)據(jù)進行一次聚集操作。
- 步驟3:將分區(qū)數(shù)據(jù)的元信息寫到內(nèi)存索引數(shù)據(jù)結構SpillRecord中,其中每個分區(qū)的元信息包括在臨時文件中的偏移量、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小。如果當前內(nèi)存索引大小超過1MB,則將內(nèi)存索引寫到文件output/spillN.out.index中。
三、ReduceTask工作機制
ReduceTask就是Reduce階段的job,它的數(shù)量由Map階段的分區(qū)進行決定

四、ReduceTask工作流程:
1.Copy階段:ReduceTask從各個MapTask上遠程拷貝一片數(shù)據(jù),并針對某一片數(shù)據(jù),如果其大小超過一定閾值,則寫到磁盤上,否則直接放到內(nèi)存中。
2.Merge階段:在遠程拷貝數(shù)據(jù)的同時,ReduceTask啟動了兩個后臺線程對內(nèi)存和磁盤上的文件進行合并,以防止內(nèi)存使用過多或磁盤上文件過多。
3.Sort階段:按照MapReduce語義,用戶編寫reduce()函數(shù)輸入數(shù)據(jù)是按key進行聚集的一組數(shù)據(jù)。為了將key相同的數(shù)據(jù)聚在一起,Hadoop采用了基于排序的策略。由于各個MapTask已經(jīng)實現(xiàn)對自己的處理結果進行了局部排序,因此,ReduceTask只需對所有數(shù)據(jù)進行一次歸并排序即可。
4.Reduce階段:reduce()函數(shù)將計算結果寫到HDFS上
五、數(shù)據(jù)清洗(ETL)
我們在大數(shù)據(jù)開篇概述中說過,數(shù)據(jù)是低價值的,所以我們要從海量數(shù)據(jù)中獲取到我們想要的數(shù)據(jù),首先就需要對數(shù)據(jù)進行清洗,這個過程也稱之為ETL
還記得上一章中的Join案例么,我們對pname字段的填充,也算數(shù)據(jù)清洗的一種,下面我通過一個簡單的案例來演示一下數(shù)據(jù)清洗
數(shù)據(jù)清洗案例
需求:過濾一下log日志中字段個數(shù)小于11的日志(隨便舉個栗子而已)
測試數(shù)據(jù):就拿我們這兩天學習中HadoopNodeName產(chǎn)生的日志來當測試數(shù)據(jù)吧,我將log日志信息放到我的windows中,數(shù)據(jù)位置如下
/opt/module/hadoop-3.1.3/logs/hadoop-xxx-nodemanager-hadoop102.log
編寫思路:
直接通過切片,然后判斷長度即可,因為是舉個栗子,沒有那么復雜
真正的數(shù)據(jù)清洗會使用框架來做,這個我后面會為大家?guī)硐嚓P的知識
- ETLDriver
package com.company.etl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ETLDriver {
public static void main(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(ETLDriver.class);
job.setMapperClass(ETLMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job,new Path("D:\\io\\input8"));
FileOutputFormat.setOutputPath(job,new Path("D:\\io\\output88"));
job.waitForCompletion(true);
}
}
- ETLMapper
package com.company.etl;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ETLMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//清洗(過濾)
String line = value.toString();
String[] info = line.split(" ");
//判斷
if (info.length > 11){
context.write(value,NullWritable.get());
}
}
}
六、計數(shù)器應用
- 顧名思義,計數(shù)器的作用就是用于計數(shù)的,在Hadoop中,它內(nèi)部也有一個計數(shù)器,用于監(jiān)控統(tǒng)計我們處理數(shù)據(jù)的數(shù)量
- 我們通常在MapReduce中通過上下文 context進行應用,例如在Mapper中,我通過step方法進行初始化計數(shù)器,然后在我們map方法中進行計數(shù)
七、計數(shù)器案例
在上面數(shù)據(jù)清洗的基礎上進行計數(shù)器的使用,Driver沒什么變化,只有Mapper
我們在Mapper的setup方法中,創(chuàng)建計數(shù)器的對象,然后在map方法中調(diào)用它即可
ETLMapper
package com.company.etl;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ETLMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
private Counter sucess;
private Counter fail;
/*
創(chuàng)建計數(shù)器對象
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
/*
getCounter(String groupName, String counterName);
第一個參數(shù) :組名 隨便寫
第二個參數(shù) :計數(shù)器名 隨便寫
*/
sucess = context.getCounter("ETL", "success");
fail = context.getCounter("ETL", "fail");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//清洗(過濾)
String line = value.toString();
String[] info = line.split(" ");
//判斷
if (info.length > 11){
context.write(value,NullWritable.get());
//統(tǒng)計
sucess.increment(1);
}else{
fail.increment(1);
}
}
}
八、MapReduce總結
好了,到這里,我們MapReduce就全部學習完畢了,接下來,我再把整個內(nèi)容串一下,還是MapReduce的那個圖

MapReduce的主要工作就是對數(shù)據(jù)進行運算、分析,它的工作流程如下:
1.我們會將HDFS中的數(shù)據(jù)通過InputFormat進行進行讀取、切片,從而計算出MapTask的數(shù)量
2.每一個MapTask中都會有Mapper類,里面的map方法就是任務的具體實現(xiàn),我們通過它,可以完成數(shù)據(jù)的key,value封裝,然后通過分區(qū)進入shuffle中來完成每個MapTask中的數(shù)據(jù)分區(qū)排序
3.通過分區(qū)來決定ReduceTask的數(shù)量,每一個ReduceTask都有一個Reducer類,里面的reduce方法是ReduceTask的具體實現(xiàn),它主要是完成最后的數(shù)據(jù)合并工作
4.當Reduce任務過重,我們可以通過Combiner合并,在Mapper階段來進行局部的數(shù)據(jù)合并,減輕Reduce的任務量,當然,前提是Combiner所做的局部合并工作不會影響最終的結果
5.當Reducer的任務完成,會將最終的key,value寫出,交給OutputFormat,用于數(shù)據(jù)的寫出,通過OutputFormat來完成HDFS的寫入操作
每一個MapTask和ReduceTask內(nèi)部都是循環(huán)進行讀取,并且它有三個方法:setup() map()/reduce() cleanup()
setup()方法是在MapTask/ReduceTask剛剛啟動時進行調(diào)用,cleanup()是在任務完成后調(diào)用
到此這篇關于Java基礎之MapReduce框架總結與擴展知識點的文章就介紹到這了,更多相關Java MapReduce框架內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
springboot @ConditionalOnMissingBean注解的作用詳解
這篇文章主要介紹了springboot @ConditionalOnMissingBean注解的作用詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-08-08
java編程創(chuàng)建型設計模式工廠方法模式示例詳解
這篇文章主要為大家介紹了java編程創(chuàng)建型設計模式之工廠方法模式的創(chuàng)建及案例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助2022-02-02
java多線程CyclicBarrier的使用案例,讓線程起步走
這篇文章主要介紹了java多線程CyclicBarrier的使用案例,讓線程起步走!具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02
基于Springboot的高校社團管理系統(tǒng)的設計與實現(xiàn)
本文將基于Springboot+Mybatis開發(fā)實現(xiàn)一個高校社團管理系統(tǒng),系統(tǒng)包含三個角色:管理員、團長、會員。文中采用的技術有Springboot、Mybatis、Jquery、AjAX、JSP等,感興趣的可以了解一下2022-07-07
Java?CompletableFuture實現(xiàn)多線程異步編排
這篇文章主要為大家介紹了Java?CompletableFuture實現(xiàn)多線程異步編排,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-09-09

