hadoop之MapReduce框架原理
MapReduce框架的簡(jiǎn)單運(yùn)行機(jī)制:
MapReduce是分為兩個(gè)階段的,MapperTask階段,和ReduceTask階段。(中間有一個(gè)Shuffle階段)
Mapper階段,可以通過(guò)選擇什么方式(K,V的選擇對(duì)應(yīng)不同的方法)來(lái)讀取數(shù)據(jù),讀取后把數(shù)據(jù)交給Mapper來(lái)進(jìn)行后續(xù)的業(yè)務(wù)邏輯(用戶寫(xiě)),讓后進(jìn)入Reduce階段通過(guò)Shuffle來(lái)拉取Mapper階段的數(shù)據(jù),讓后通過(guò)OutputFormat(等方法)來(lái)寫(xiě)出(可以是ES,mysql,hbase,文件)
Mapper階段:
InputFormat數(shù)據(jù)輸入:
切片與MapTask并行度決定機(jī)制:
MapTask個(gè)數(shù),決定了并行度(相當(dāng)于在生成map集合的過(guò)程中有幾個(gè)人在干活),**(不一定越多越好,當(dāng)數(shù)據(jù)量小的時(shí)候可能開(kāi)啟的眾多MapTask的時(shí)間用一個(gè)MapTask已經(jīng)計(jì)算完成)
數(shù)據(jù)塊:Block是HDFS物理上把數(shù)據(jù)分成一塊一塊。數(shù)據(jù)塊是HDFS存儲(chǔ)數(shù)據(jù)單位。
數(shù)據(jù)切片:數(shù)據(jù)切片只是在邏輯上對(duì)輸入進(jìn)行分片,并不會(huì)在磁盤(pán)上將其切分成片進(jìn)行存儲(chǔ)。數(shù)據(jù)切片是MapReduce程序計(jì)算輸入數(shù)據(jù)的單位,一個(gè)切片會(huì)對(duì)應(yīng)啟動(dòng)一個(gè)MapTask。
job提交過(guò)程源碼解析:
因?yàn)槲覀冋业膉ob提交,所以在job提交函數(shù)哪里打個(gè)斷點(diǎn),
步入函數(shù)后
ensureState(JobState.DEFINE); 是確保你的狀態(tài)是正確的(狀態(tài)不對(duì)或者running 都會(huì)拋異常)
setUseNewAPI(); 處理Hadoop不同版本之間的API兼容
connect(); 連接,(客戶端需要與集群或者本機(jī)連接)
checkSpecs(job); 校驗(yàn) 校驗(yàn)輸出路徑是否已經(jīng)創(chuàng)建,是否有參
return submitter.submitJobInternal(Job.this, cluster); 核心代碼 步入的時(shí)候需要點(diǎn)兩下,
第一個(gè)步入是步入的參數(shù)Job 第二個(gè)才步入此方法
這個(gè)方法是提交job(在集群模式下,提交的job包含(通過(guò)客戶端方式把jar包提交給集群),在本地不需要提交jar包,jar在本地是存在的)
還會(huì)進(jìn)行切片,生成切片信息(幾個(gè)切片就有幾個(gè)MapTask)
還會(huì) 生成xml文件
綜上 job提交會(huì)交三樣?xùn)|西(jar,xml文件,切片信息---》集群模式下)
最后會(huì)刪除所有的信息文件
切片邏輯:
**(切片是每一個(gè)文件單獨(dú)切片)
在本地是32m一塊,前邊說(shuō)過(guò),默認(rèn)一塊對(duì)應(yīng)一個(gè)切片,但是有前提條件,再你減去32m的時(shí)候,余下最后一塊如果大于1.1倍就重新分配切片,但如果小于1.1,則不能更新分片
例子1:
已有一個(gè)32.1m的數(shù)據(jù) 物理分塊是(32m+0.1m)切片分布是(1個(gè)切片,因?yàn)?2.1/32=1.003125<1.1 所以使用一個(gè)切片)
例子2:
已有一個(gè)100m的數(shù)據(jù)
100-32-32=36>32(36/32=1.125>1.1 所以最后36m需要分配兩個(gè)切片)
**塊的大小沒(méi)辦法改變,但是可以調(diào)切片大?。╩axSize讓切片調(diào)?。╩inSize讓切片調(diào)大)
切片總結(jié):
(開(kāi)一個(gè)MapTask 默認(rèn)是占1g內(nèi)存+1個(gè)cpu)
1)FileInputFormat實(shí)現(xiàn)類
思考:在運(yùn)行MapReduce程序時(shí),輸入的文件格式包括:基于行的日志文件、二進(jìn)制格式文件、數(shù)據(jù)庫(kù)表等。那么,針對(duì)不同的數(shù)據(jù)類型,MapReduce是如何讀取這些數(shù)據(jù)的呢?
FileInputFormat常見(jiàn)的接口實(shí)現(xiàn)類包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定義InputFormat等。(應(yīng)用場(chǎng)景的不同選擇不同的接口實(shí)現(xiàn)類)
TextInputFormat是默認(rèn)的FileInputFormat實(shí)現(xiàn)類。按行讀取每條記錄。鍵是存儲(chǔ)該行在整個(gè)文件中的起始字節(jié)偏移量, LongWritable類型。值是這行的內(nèi)容,不包括任何行終止符(換行符和回車符),Text類型。
CombineTextInputFormat用于小文件過(guò)多的場(chǎng)景,它可以將多個(gè)小文件從邏輯上規(guī)劃到一個(gè)切片中,這樣,多個(gè)小文件就可以交給一個(gè)MapTask處理。
進(jìn)行虛擬存儲(chǔ)
(1)虛擬存儲(chǔ)過(guò)程:
將輸入目錄下所有文件大小,依次和設(shè)置的setMaxInputSplitSize(切片大小)值比較,如果不大于設(shè)置的最大值,邏輯上劃分一個(gè)塊。如果輸入文件大于設(shè)置的最大值且大于兩倍,那么以最大值切割一塊;當(dāng)剩余數(shù)據(jù)大小超過(guò)設(shè)置的最大值且不大于最大值2倍,此時(shí)將文件均分成2個(gè)虛擬存儲(chǔ)塊(防止出現(xiàn)太小切片)。
測(cè)試:
再不使用CombineTextInputFormat情況下(默認(rèn)TextInputFormat)
可以看到切片為4
添加代碼,設(shè)置實(shí)現(xiàn)類為CombineTextInputFormat 和 設(shè)置虛擬存儲(chǔ)切片大小
// 如果不設(shè)置InputFormat,它默認(rèn)用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); //虛擬存儲(chǔ)切片最大值設(shè)置4m CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
可以看到,現(xiàn)在是3個(gè)切片
我們可以通過(guò)改變虛擬切片大小來(lái)改變調(diào)用的切片的數(shù)量
綜上:影響切片的數(shù)量的因素為:(1)數(shù)據(jù)量的大?。?)切片的大小(一般會(huì)自動(dòng)調(diào)整)(3)文件格式(有些文件是不可切片的)
影響切片大小的因素: HDFS中塊的大?。ㄍㄟ^(guò)調(diào)maxsize,minsize與塊的大小進(jìn)行比較來(lái)判斷)
Shuffle階段:
shuffle階段是一個(gè)從mapper階段出來(lái)的后的階段,會(huì)寫(xiě)入(k,v)一個(gè)環(huán)形緩沖區(qū)(緩沖區(qū)分為兩半,一半存儲(chǔ)索引,一半存儲(chǔ)數(shù)據(jù),默認(rèn)100m,到達(dá)80%后會(huì)反向逆寫(xiě)(減少時(shí)間消耗,提高效率,逆寫(xiě)是因?yàn)椴恍枰却恳鐚?xiě)后在進(jìn)行寫(xiě)入操作)逆寫(xiě)入文件前會(huì)進(jìn)行分區(qū)(分區(qū)的個(gè)數(shù)與reduceTask的個(gè)數(shù)有關(guān))排序(對(duì)key進(jìn)行排序,但是存儲(chǔ)位置并不發(fā)生改變,只改變索引的位置,改變存儲(chǔ)位置消耗資源較大))寫(xiě)入文件后會(huì)進(jìn)行歸并排序(在有序的情況下,歸并是最高效的))
排序:
排序可以自定義排序,舉例全排序:
自定義了一個(gè)Bean類,bean對(duì)象做為key傳輸,需要實(shí)現(xiàn)WritableComparable接口重寫(xiě)compareTo方法,就可以實(shí)現(xiàn)排序。
Combiner合并:
并不滿足所有生產(chǎn)環(huán)境下,只有在不影響最終業(yè)務(wù)邏輯下才可以實(shí)現(xiàn)(求和就可以,算平均值就不可以)
combiner與reducetask區(qū)別如下:
ReduceTask階段:
(1)Copy階段:ReduceTask從各個(gè)MapTask上遠(yuǎn)程拷貝一片數(shù)據(jù),并針對(duì)某一片數(shù)據(jù),如果其大小超過(guò)一定閾值,則寫(xiě)到磁盤(pán)上,否則直接放到內(nèi)存中。
(2)Sort階段:在遠(yuǎn)程拷貝數(shù)據(jù)的同時(shí),ReduceTask啟動(dòng)了兩個(gè)后臺(tái)線程對(duì)內(nèi)存和磁盤(pán)上的文件進(jìn)行合并,以防止內(nèi)存使用過(guò)多或磁盤(pán)上文件過(guò)多。按照MapReduce語(yǔ)義,用戶編寫(xiě)reduce()函數(shù)輸入數(shù)據(jù)是按key進(jìn)行聚集的一組數(shù)據(jù)。為了將key相同的數(shù)據(jù)聚在一起,Hadoop采用了基于排序的策略。由于各個(gè)MapTask已經(jīng)實(shí)現(xiàn)對(duì)自己的處理結(jié)果進(jìn)行了局部排序,因此,ReduceTask只需對(duì)所有數(shù)據(jù)進(jìn)行一次歸并排序即可。
(3)Reduce階段:reduce()函數(shù)將計(jì)算結(jié)果寫(xiě)到HDFS上。
ReduceTask的個(gè)數(shù)可以手動(dòng)進(jìn)行設(shè)置,設(shè)置幾就會(huì)產(chǎn)生幾個(gè)文件(分區(qū)同上)
Reduce Join:
簡(jiǎn)述流程:
(1)自定義bean對(duì)象(序列化反序列化函數(shù)---implements Writable)
(2)寫(xiě)mapper類 先重寫(xiě)setup方法(因?yàn)楸景咐枰獌蓚€(gè)文件,初始化(讀多個(gè)文 希望先獲取到文件名稱(多文件) 一個(gè)文件一個(gè)切片 setup方法是一個(gè)優(yōu)化手段 獲取文件名稱)
(3)寫(xiě)reduce類(業(yè)務(wù)邏輯) 先創(chuàng)建一個(gè)集合(類型為bean類型)和bean對(duì)象用于存儲(chǔ)
用for循環(huán)遍歷value(key是一樣的 一樣的key才會(huì)進(jìn)入同一個(gè)reduce方法)
獲取文件名判斷寫(xiě)出不同的業(yè)務(wù)邏輯
"order"表:
先創(chuàng)建一個(gè)bean對(duì)象,用于存儲(chǔ)數(shù)據(jù),用于后續(xù)寫(xiě)入集合
用到方法 BeanUtils.copyProperties(tmpOrderBean,value); 獲取原數(shù)據(jù)
讓后加入上述創(chuàng)建的集合 orderBeans.add(tmpOrderBean);
“pd”表:
BeanUtils.copyProperties(pdBean,value);直接獲取原數(shù)據(jù)
存儲(chǔ)結(jié)束,結(jié)合階段:
使用增強(qiáng)for
orderbean.setPname(pdBean.getPname());
使用set函數(shù)直接設(shè)置集合中的pname
讓后寫(xiě)入
context.write(orderbean,NullWritable.get());
業(yè)務(wù)結(jié)束
Reduce Join的缺點(diǎn):這種方式中,合并的操作是在Reduce階段完成,Reduce端的處理壓力太大,Map節(jié)點(diǎn)的運(yùn)算負(fù)載則很低,資源利用率不高,且在Reduce階段極易產(chǎn)生數(shù)據(jù)傾斜。
Map Join:
使用場(chǎng)景
Map Join適用于一張表十分小、一張表很大的場(chǎng)景。
Map端實(shí)現(xiàn)數(shù)據(jù)合并就解決了Reduce Join的缺點(diǎn)(數(shù)據(jù)傾斜)
簡(jiǎn)述流程:
在map類中
setup方法:將較小文件讀入緩存,將數(shù)據(jù)存儲(chǔ)到全局的map集合中,將緩存中的數(shù)據(jù)全部寫(xiě)入
重寫(xiě)的map方法中:
轉(zhuǎn)換成字符串在切割,通過(guò)切割后的數(shù)組獲取map集合中的pname
讓后重新設(shè)置輸出文件的格式進(jìn)行寫(xiě)出
到此這篇關(guān)于hadoop之MapReduce框架原理的文章就介紹到這了,更多相關(guān)MapReduce框架原理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot網(wǎng)站應(yīng)用使用第三方qq登錄的實(shí)現(xiàn)過(guò)程
這篇文章主要介紹了springboot網(wǎng)站應(yīng)用使用第三方qq登錄,本文通過(guò)實(shí)例圖文相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-09-09解決使用RestTemplate時(shí)報(bào)錯(cuò)RestClientException的問(wèn)題
這篇文章主要介紹了解決使用RestTemplate時(shí)報(bào)錯(cuò)RestClientException的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08SpringBoot中驗(yàn)證用戶上傳的圖片資源的方法
這篇文章主要介紹了在SpringBoot中驗(yàn)證用戶上傳的圖片資源,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-09-09mybatis引用其他文件中的sql片段的實(shí)現(xiàn)
Mybatis中也是支持引用其他Mapper文件中的SQL片段的,本文就來(lái)介紹一下如何使用,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-03-03學(xué)生視角手把手帶你寫(xiě)Java?線程池改良版
作者是一個(gè)來(lái)自河源的大三在校生,以下筆記都是作者自學(xué)之路的一些淺薄經(jīng)驗(yàn),如有錯(cuò)誤請(qǐng)指正,將來(lái)會(huì)不斷的完善筆記,幫助更多的Java愛(ài)好者入門(mén)2022-03-03