Java大數(shù)據(jù)處理的核心技術(shù)MapReduce框架
MapReduce框架
1、框架圖
Input→Mapper→shuffle→Reducer→Output
2、Input數(shù)據(jù)輸入
2.1概念
(1)數(shù)據(jù)塊(Block),物理存儲,Block是HDFS物理上把文件分成一塊一塊。數(shù)據(jù)塊是HDFS存儲數(shù)據(jù)單位。
(2)數(shù)據(jù)切片,邏輯存儲,數(shù)據(jù)切片是MapReduce程序j最小計算輸入數(shù)據(jù)的單位。一個切片會啟動一個MapTask
2.2數(shù)據(jù)切片與MapTask并行度
(1)一個Job的Map階段并行度由客戶端在提交job時的切片數(shù)決定;
(2)每一個split切片分配一個MapTask并行實例片
(3)切片是針對每一個文件單獨切片
(4)默認(rèn)情況下,切片大小等于Block Size塊大小
MapTask數(shù)據(jù)=輸入文件切片數(shù)據(jù)
2.3切片過程
(1)程序先找到數(shù)據(jù)存儲目錄
(2)開始遍歷處理目錄下的每一個文件
A、按每個文件進(jìn)行切片
B、判斷文件是否可以切片(snappy、Gzip壓縮不能切)
(3)遍歷第一個文件
獲取文件大小→計算切片大小→開始切片→將切片信息寫入切片規(guī)劃文件中→提交切片規(guī)劃文件到y(tǒng)arn
A、獲取文件大?。篺s.size(文件)
B、計算切片大?。涸O(shè)置minsize、maxsize、blocksize
mapreduce.input.fileinputformat.split.minsize=1 默認(rèn)值為1 mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默認(rèn)值Long.MAXValue
計算公式 :computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))
最大取最小,最小取最大。因此切片大小默認(rèn)與 HDFS 的 block 保持一致。
maxsize(切片最大值): 參數(shù)如果調(diào)到比 blocksize 小,則會讓切片變小,而且就等于配置的這個參數(shù)的值。minsize(切片最小值): 參數(shù)調(diào)的比 blockSize 大,則可以讓切片變得比 blocksize 還大。
C、開始切片:getSplit()
每次切片時,都要判斷剩下的是否大于塊的1.1倍,不大于1.1倍就切分成一塊切片
D、將切片信息寫入切片規(guī)劃文件中:job.split
記錄起始位置、長度、所在切點列表等
E、提交切片規(guī)劃文件到y(tǒng)arn
yarn上MRAppMaster根據(jù)切片規(guī)劃計算MapTask數(shù)
三個文件:切片規(guī)則文件(job.split)、參數(shù)配置文件(job.xml)、程序jar包
2.4類圖
2.5TextInputFormat
(1)是FileInputFormat默認(rèn)的實現(xiàn)類
(2)按行讀取每條記錄,Key為該行在整個文件的超始字節(jié)偏移量,LongWritable型。Value為行內(nèi)容,不包括任何終止符(換行符、回車符),Text型。
2.6CombineTextInputFormat
(1)應(yīng)用場景:用于小文件過多的場景,將多個小文件從邏輯上規(guī)劃到一個切片中, 這樣多個小文件交給一個MapTask處理;
(2)虛擬存儲切片最大值默認(rèn)4M,最好根據(jù)實際的小文件大小來設(shè)置
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
2.7Read階段
MapTask通過InputFormat獲得RecordReader,從輸入InputSplit中解析出一個個Key/Value
3、Map階段
將解析出來的Key/Value交給用戶編寫的map()函數(shù)處理,并產(chǎn)生一系列新的Key/Value
4、Collect收集階段
(1)map()函數(shù)中,當(dāng)數(shù)據(jù)處理完成后,一般會調(diào)用OutputCollector.collect()輸出結(jié)果
(2)在該函數(shù)內(nèi)部,它會將生成的Key/Value分區(qū)(調(diào)用Partitioner),并寫入一個環(huán)形內(nèi)存緩沖區(qū)中
5、Shuffle階段
(1)map方法之后 ,reduce方法之前的數(shù)據(jù)處理過程稱之為Shuffle;
(2)環(huán)形內(nèi)存緩沖區(qū)
(3)Partition分區(qū)-默認(rèn)分區(qū)
A、根據(jù)需求按照條件輸出到不同分區(qū)
B、默認(rèn)分區(qū):根據(jù)key的hashcode對ReduceTask數(shù)理取模
C、默認(rèn)的ReduceTask的數(shù)量為1,對應(yīng)參數(shù)mapreduce.job.reduces
(4)Partition分區(qū)-自定義Partitioner
A、自定義類繼承Partitioner<key,value>,重寫getPartition()方法
public class ProvincePartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text text, FlowBean flowBean, int i) { // 獲取手機(jī)號前三位prePhone String phone = text.toString(); String prePhone = phone.substring(0, 3); //定義一個分區(qū)號變量partition,根據(jù)prePhone 設(shè)置分區(qū)號 int partition; if ("136".equals(prePhone)) { partition = 0; } else if ("137".equals(prePhone)) { partition = 1; } else if ("138".equals(prePhone)) { partition = 2; } else if ("139".equals(prePhone)) { partition = 3; } else { partition = 4; } //最后返回分區(qū)號partition return partition; } }
B、在job驅(qū)動中,設(shè)置自定義partitioner,job.setPartitionerClass(自定義分區(qū)類.class)
C、自定義Partition后,要根據(jù)自定義Partitioner的邏輯設(shè)置相應(yīng)的數(shù)量的ReduceTask:job.setNumReduceTasks(數(shù)量)
public class FlowDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1.獲取job對象 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2.關(guān)聯(lián)本Driver類 job.setJarByClass(FlowDriver.class); // 3.關(guān)聯(lián)Mapper和Reducer類 job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); // 4.設(shè)置Map端輸出KV類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 5.設(shè)置最終輸出KV類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 6.設(shè)置程序的輸入和輸出路徑 FileInputFormat.setInputPaths(job, new Path("C:\\install\\temp\\input\\input02\\phone_data.txt")); FileOutputFormat.setOutputPath(job, new Path("C:\\install\\temp\\output\\output06")); // 8.指定自定義分區(qū)器 job.setPartitionerClass(ProvincePartitioner.class); // 9.同時也指定相應(yīng)數(shù)量的ReduceTask--對應(yīng)的參數(shù)mapreduce.job.reduces,默認(rèn)為1 job.setNumReduceTasks(5); // 7.提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
(5)Partition分區(qū)總結(jié)
A、如果ReduceTask數(shù)量 > getPartition()結(jié)果數(shù),則會多產(chǎn)生幾個空的輸出文件
B、如果 1 <ReduceTask數(shù)量 <getPartition()結(jié)果數(shù),則有一部分分區(qū)數(shù)據(jù)無處安放,會異常
C、如果ReduceTask數(shù)量=1,則不管MapTask輸出多少個分區(qū)文件,最終結(jié)果只有一個ReduceTask,只會產(chǎn)生一個結(jié)果文件。(分區(qū)數(shù)不大于1,不會走默認(rèn)hash分區(qū)器和自定義分區(qū)器,直接返回)
D、分區(qū)號必須從0開始,逐一累加
(6)排序
A、排序是MapReduce框架中最重要的操作之一
B、MapTask和ReduceTask均會對數(shù)據(jù)按key進(jìn)行排序,該 操作屬于Hadoop的默認(rèn)行為 。任務(wù)應(yīng)用程序中的數(shù)據(jù)均會被排序,而不管邏輯上是否需要。
C、默認(rèn)排序是按照字典順序排序,排序的方法為快速排序
D、排序分類:部分排序、全排序、輔助排序、二次排序
(7)溢寫
A、當(dāng)環(huán)形緩沖區(qū)滿后,MapReduce會將數(shù)據(jù)寫到本地磁盤上,生成一個臨時文件
(8)Combiner
A、Combiner是MR程序中Mapper和Reducer之外的一種組件
B、Combiner的父類是Reducer
C、Combiner與Reducer區(qū)別:在于運行的位置 ,Combiner是在每一個MapTask所在節(jié)點運行,即在分區(qū)、排序后準(zhǔn)備溢寫前可以進(jìn)行combiner。Reducer是接收全局所有MapTask輸出結(jié)果。
D、Combiner的意義是對每一個MapTask的輸出進(jìn)行局部匯總,以減少網(wǎng)絡(luò)傳輸量
E、Combiner應(yīng)用前提是不影響最終的業(yè)務(wù)邏輯
public class WordCountCombiner extends Reducer<Text, IntWritable, Text,IntWritable> { private IntWritable outV = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } outV.set(sum); context.write(key,outV); } }
public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 一獲取二關(guān)聯(lián)三設(shè)置一提交 // 1.獲取配置信息及Job對象 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2.關(guān)聯(lián)本Driver程序的類 job.setJarByClass(WordCountDriver.class); // 3.關(guān)聯(lián)Mapper和Reducer的業(yè)務(wù)類 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 4.設(shè)置Mapper輸出的KV類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5.設(shè)置最終輸出的KV類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6.設(shè)置輸入和輸出路徑 //FileInputFormat.setInputPaths(job, new Path(args[0])); //FileOutputFormat.setOutputPath(job, new Path(args[1])); FileInputFormat.setInputPaths(job, new Path("C:\\install\\temp\\input\\hadoop.txt")); FileOutputFormat.setOutputPath(job, new Path("C:\\install\\temp\\output\\output01-2")); // 8.設(shè)置Combiner類--方式一 //job.setCombinerClass(WordCountCombiner.class); // 方式二:其新建的WordCountCombiner的reduce方法處理與正常的WordCountReducer中的reduce方法處理邏輯是一樣 // 因此可以直接用此類作為combiner類 job.setCombinerClass(WordCountReducer.class); // 9.設(shè)置ReduceTasks的數(shù)量--這樣就沒有reduce階段,就不會有shuffle,Combiner也就沒有用,直接由map輸出, // 文件名為part-m-00000,就是不part-r-00000,兩者結(jié)果是不一樣的 // 即如果沒有reduce階段,即使設(shè)置了combiner也不起作用 // job.setNumReduceTasks(0); // 7.提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); // 0-正常退出 非0(1)異常終止(結(jié)束) } }
(9)Meger
A、MapTask以分區(qū)為單位進(jìn)行合并,對所有臨時文件合并成一個大文件(output/file.out),同時生成相應(yīng)索引文件(output/file.out.index)
B、對某個分區(qū)采用多輪遞歸合并的方式,每次合并默認(rèn)10個文件,每個MapTask最終得到一個大文件
6、ReduceTask
(1)Copy階段
ReduceTask從各個MapTask上遠(yuǎn)程拷貝一片數(shù)據(jù),如大小超過閥值,則寫到磁盤上,否則直接放在內(nèi)存中
(2)Sort階段
由于各個MapTask已經(jīng)實現(xiàn)了對自己處理結(jié)果進(jìn)行了局部排序,因此ReduceTask只需要對所有數(shù)據(jù)進(jìn)行一次歸并排序即可
(3)Reducer階段
reduce()函數(shù)將計算結(jié)果寫到HDFS上
(4)其他
A、ReduceTask數(shù)量默認(rèn)是1,可手動設(shè)置job.setNumReduceTasks(數(shù)量)
B、ReduceTask=0,表示沒有reduce階段,輸出文件個數(shù)和Map個數(shù)一致
C、如果數(shù)據(jù)分布不均勻,就會在reduce階段產(chǎn)生數(shù)據(jù)傾斜
D、ReduceTask數(shù)量并不能任意設(shè)置,要考慮業(yè)務(wù)邏輯需求,具體多少個ReduceTask,需要根據(jù)集群性能確定
E、如果分區(qū)數(shù)不是1,但ReduceTask為1,不執(zhí)行分區(qū)過程(執(zhí)行分區(qū)的前提是判斷ReduceNum個數(shù)是否大于1)
到此這篇關(guān)于Java大數(shù)據(jù)處理的核心技術(shù)MapReduce框架的文章就介紹到這了,更多相關(guān)Java MapReduce框架內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot中報錯Invalid character found in
這篇文章主要介紹了springboot中報錯Invalid character found in the request的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-09-09Java線程并發(fā)中常見的鎖機(jī)制詳細(xì)介紹
越來越多的互聯(lián)網(wǎng)企業(yè)面臨著用戶量膨脹而帶來的并發(fā)安全問題。接下來通過本文給大家介紹Java線程并發(fā)中常見的鎖機(jī)制,感興趣的朋友一起看看吧2016-05-05Mybatis-Plus實現(xiàn)自定義SQL具體方法
Mybatis-Plus是Mybatis的一個增強工具,它可以優(yōu)化我們的開發(fā)效率,這篇文章主要介紹了Mybatis-Plus實現(xiàn)自定義SQL,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-08-08Springboot整合MongoDB的Docker開發(fā)教程全解
這篇文章主要介紹了Springboot整合MongoDB的Docker開發(fā),本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值 ,需要的朋友可以參考下2020-07-07