hadoop之MapReduce框架原理
MapReduce框架的簡單運行機制:
MapReduce是分為兩個階段的,MapperTask階段,和ReduceTask階段。(中間有一個Shuffle階段)
Mapper階段,可以通過選擇什么方式(K,V的選擇對應不同的方法)來讀取數(shù)據(jù),讀取后把數(shù)據(jù)交給Mapper來進行后續(xù)的業(yè)務邏輯(用戶寫),讓后進入Reduce階段通過Shuffle來拉取Mapper階段的數(shù)據(jù),讓后通過OutputFormat(等方法)來寫出(可以是ES,mysql,hbase,文件)
Mapper階段:
InputFormat數(shù)據(jù)輸入:
切片與MapTask并行度決定機制:
MapTask個數(shù),決定了并行度(相當于在生成map集合的過程中有幾個人在干活),**(不一定越多越好,當數(shù)據(jù)量小的時候可能開啟的眾多MapTask的時間用一個MapTask已經(jīng)計算完成)
數(shù)據(jù)塊:Block是HDFS物理上把數(shù)據(jù)分成一塊一塊。數(shù)據(jù)塊是HDFS存儲數(shù)據(jù)單位。
數(shù)據(jù)切片:數(shù)據(jù)切片只是在邏輯上對輸入進行分片,并不會在磁盤上將其切分成片進行存儲。數(shù)據(jù)切片是MapReduce程序計算輸入數(shù)據(jù)的單位,一個切片會對應啟動一個MapTask。

job提交過程源碼解析:
因為我們找的job提交,所以在job提交函數(shù)哪里打個斷點,
步入函數(shù)后
ensureState(JobState.DEFINE); 是確保你的狀態(tài)是正確的(狀態(tài)不對或者running 都會拋異常)
setUseNewAPI(); 處理Hadoop不同版本之間的API兼容
connect(); 連接,(客戶端需要與集群或者本機連接)
checkSpecs(job); 校驗 校驗輸出路徑是否已經(jīng)創(chuàng)建,是否有參
return submitter.submitJobInternal(Job.this, cluster); 核心代碼 步入的時候需要點兩下,
第一個步入是步入的參數(shù)Job 第二個才步入此方法
這個方法是提交job(在集群模式下,提交的job包含(通過客戶端方式把jar包提交給集群),在本地不需要提交jar包,jar在本地是存在的)
還會進行切片,生成切片信息(幾個切片就有幾個MapTask)
還會 生成xml文件
綜上 job提交會交三樣東西(jar,xml文件,切片信息---》集群模式下)
最后會刪除所有的信息文件
切片邏輯:
**(切片是每一個文件單獨切片)
在本地是32m一塊,前邊說過,默認一塊對應一個切片,但是有前提條件,再你減去32m的時候,余下最后一塊如果大于1.1倍就重新分配切片,但如果小于1.1,則不能更新分片
例子1:
已有一個32.1m的數(shù)據(jù) 物理分塊是(32m+0.1m)切片分布是(1個切片,因為32.1/32=1.003125<1.1 所以使用一個切片)
例子2:
已有一個100m的數(shù)據(jù)
100-32-32=36>32(36/32=1.125>1.1 所以最后36m需要分配兩個切片)
**塊的大小沒辦法改變,但是可以調(diào)切片大?。╩axSize讓切片調(diào)?。╩inSize讓切片調(diào)大)

切片總結(jié):


(開一個MapTask 默認是占1g內(nèi)存+1個cpu)

1)FileInputFormat實現(xiàn)類
思考:在運行MapReduce程序時,輸入的文件格式包括:基于行的日志文件、二進制格式文件、數(shù)據(jù)庫表等。那么,針對不同的數(shù)據(jù)類型,MapReduce是如何讀取這些數(shù)據(jù)的呢?
FileInputFormat常見的接口實現(xiàn)類包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定義InputFormat等。(應用場景的不同選擇不同的接口實現(xiàn)類)
TextInputFormat是默認的FileInputFormat實現(xiàn)類。按行讀取每條記錄。鍵是存儲該行在整個文件中的起始字節(jié)偏移量, LongWritable類型。值是這行的內(nèi)容,不包括任何行終止符(換行符和回車符),Text類型。
CombineTextInputFormat用于小文件過多的場景,它可以將多個小文件從邏輯上規(guī)劃到一個切片中,這樣,多個小文件就可以交給一個MapTask處理。
進行虛擬存儲
(1)虛擬存儲過程:
將輸入目錄下所有文件大小,依次和設置的setMaxInputSplitSize(切片大小)值比較,如果不大于設置的最大值,邏輯上劃分一個塊。如果輸入文件大于設置的最大值且大于兩倍,那么以最大值切割一塊;當剩余數(shù)據(jù)大小超過設置的最大值且不大于最大值2倍,此時將文件均分成2個虛擬存儲塊(防止出現(xiàn)太小切片)。

測試:
再不使用CombineTextInputFormat情況下(默認TextInputFormat)

可以看到切片為4
添加代碼,設置實現(xiàn)類為CombineTextInputFormat 和 設置虛擬存儲切片大小
// 如果不設置InputFormat,它默認用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); //虛擬存儲切片最大值設置4m CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

可以看到,現(xiàn)在是3個切片
我們可以通過改變虛擬切片大小來改變調(diào)用的切片的數(shù)量
綜上:影響切片的數(shù)量的因素為:(1)數(shù)據(jù)量的大小(2)切片的大?。ㄒ话銜詣诱{(diào)整)(3)文件格式(有些文件是不可切片的)
影響切片大小的因素: HDFS中塊的大小(通過調(diào)maxsize,minsize與塊的大小進行比較來判斷)
Shuffle階段:
shuffle階段是一個從mapper階段出來的后的階段,會寫入(k,v)一個環(huán)形緩沖區(qū)(緩沖區(qū)分為兩半,一半存儲索引,一半存儲數(shù)據(jù),默認100m,到達80%后會反向逆寫(減少時間消耗,提高效率,逆寫是因為不需要等待全部溢寫后在進行寫入操作)逆寫入文件前會進行分區(qū)(分區(qū)的個數(shù)與reduceTask的個數(shù)有關)排序(對key進行排序,但是存儲位置并不發(fā)生改變,只改變索引的位置,改變存儲位置消耗資源較大))寫入文件后會進行歸并排序(在有序的情況下,歸并是最高效的))
排序:
排序可以自定義排序,舉例全排序:
自定義了一個Bean類,bean對象做為key傳輸,需要實現(xiàn)WritableComparable接口重寫compareTo方法,就可以實現(xiàn)排序。
Combiner合并:
并不滿足所有生產(chǎn)環(huán)境下,只有在不影響最終業(yè)務邏輯下才可以實現(xiàn)(求和就可以,算平均值就不可以)
combiner與reducetask區(qū)別如下:

ReduceTask階段:
(1)Copy階段:ReduceTask從各個MapTask上遠程拷貝一片數(shù)據(jù),并針對某一片數(shù)據(jù),如果其大小超過一定閾值,則寫到磁盤上,否則直接放到內(nèi)存中。
(2)Sort階段:在遠程拷貝數(shù)據(jù)的同時,ReduceTask啟動了兩個后臺線程對內(nèi)存和磁盤上的文件進行合并,以防止內(nèi)存使用過多或磁盤上文件過多。按照MapReduce語義,用戶編寫reduce()函數(shù)輸入數(shù)據(jù)是按key進行聚集的一組數(shù)據(jù)。為了將key相同的數(shù)據(jù)聚在一起,Hadoop采用了基于排序的策略。由于各個MapTask已經(jīng)實現(xiàn)對自己的處理結(jié)果進行了局部排序,因此,ReduceTask只需對所有數(shù)據(jù)進行一次歸并排序即可。
(3)Reduce階段:reduce()函數(shù)將計算結(jié)果寫到HDFS上。
ReduceTask的個數(shù)可以手動進行設置,設置幾就會產(chǎn)生幾個文件(分區(qū)同上)
Reduce Join:
簡述流程:
(1)自定義bean對象(序列化反序列化函數(shù)---implements Writable)
(2)寫mapper類 先重寫setup方法(因為本案例需要兩個文件,初始化(讀多個文 希望先獲取到文件名稱(多文件) 一個文件一個切片 setup方法是一個優(yōu)化手段 獲取文件名稱)
(3)寫reduce類(業(yè)務邏輯) 先創(chuàng)建一個集合(類型為bean類型)和bean對象用于存儲
用for循環(huán)遍歷value(key是一樣的 一樣的key才會進入同一個reduce方法)
獲取文件名判斷寫出不同的業(yè)務邏輯
"order"表:
先創(chuàng)建一個bean對象,用于存儲數(shù)據(jù),用于后續(xù)寫入集合
用到方法 BeanUtils.copyProperties(tmpOrderBean,value); 獲取原數(shù)據(jù)
讓后加入上述創(chuàng)建的集合 orderBeans.add(tmpOrderBean);
“pd”表:
BeanUtils.copyProperties(pdBean,value);直接獲取原數(shù)據(jù)
存儲結(jié)束,結(jié)合階段:
使用增強for
orderbean.setPname(pdBean.getPname());
使用set函數(shù)直接設置集合中的pname
讓后寫入
context.write(orderbean,NullWritable.get());
業(yè)務結(jié)束
Reduce Join的缺點:這種方式中,合并的操作是在Reduce階段完成,Reduce端的處理壓力太大,Map節(jié)點的運算負載則很低,資源利用率不高,且在Reduce階段極易產(chǎn)生數(shù)據(jù)傾斜。
Map Join:
使用場景
Map Join適用于一張表十分小、一張表很大的場景。
Map端實現(xiàn)數(shù)據(jù)合并就解決了Reduce Join的缺點(數(shù)據(jù)傾斜)
簡述流程:
在map類中
setup方法:將較小文件讀入緩存,將數(shù)據(jù)存儲到全局的map集合中,將緩存中的數(shù)據(jù)全部寫入
重寫的map方法中:
轉(zhuǎn)換成字符串在切割,通過切割后的數(shù)組獲取map集合中的pname
讓后重新設置輸出文件的格式進行寫出
到此這篇關于hadoop之MapReduce框架原理的文章就介紹到這了,更多相關MapReduce框架原理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
springboot網(wǎng)站應用使用第三方qq登錄的實現(xiàn)過程
這篇文章主要介紹了springboot網(wǎng)站應用使用第三方qq登錄,本文通過實例圖文相結(jié)合給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-09-09
解決使用RestTemplate時報錯RestClientException的問題
這篇文章主要介紹了解決使用RestTemplate時報錯RestClientException的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08

