欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java大數(shù)據(jù)處理的核心技術(shù)MapReduce框架

 更新時間:2023年05月09日 09:25:43   作者:mxh_wishes  
MapReduce是一種分布式計算框架,適用于大規(guī)模的數(shù)據(jù)處理。它將大數(shù)據(jù)分成多個小數(shù)據(jù)塊,通過Map和Reduce兩個階段對數(shù)據(jù)進(jìn)行處理和分析。MapReduce框架具有可靠、高效、可擴(kuò)展等特點,已經(jīng)成為大數(shù)據(jù)處理的核心技術(shù)

MapReduce框架

1、框架圖

Input→Mapper→shuffle→Reducer→Output

2、Input數(shù)據(jù)輸入

2.1概念

(1)數(shù)據(jù)塊(Block),物理存儲,Block是HDFS物理上把文件分成一塊一塊。數(shù)據(jù)塊是HDFS存儲數(shù)據(jù)單位。

(2)數(shù)據(jù)切片,邏輯存儲,數(shù)據(jù)切片是MapReduce程序j最小計算輸入數(shù)據(jù)的單位。一個切片會啟動一個MapTask

2.2數(shù)據(jù)切片與MapTask并行度

(1)一個Job的Map階段并行度由客戶端在提交job時的切片數(shù)決定;

(2)每一個split切片分配一個MapTask并行實例片

(3)切片是針對每一個文件單獨切片

(4)默認(rèn)情況下,切片大小等于Block Size塊大小

MapTask數(shù)據(jù)=輸入文件切片數(shù)據(jù)

2.3切片過程

(1)程序先找到數(shù)據(jù)存儲目錄

(2)開始遍歷處理目錄下的每一個文件

A、按每個文件進(jìn)行切片

B、判斷文件是否可以切片(snappy、Gzip壓縮不能切)

(3)遍歷第一個文件

獲取文件大小→計算切片大小→開始切片→將切片信息寫入切片規(guī)劃文件中→提交切片規(guī)劃文件到y(tǒng)arn

A、獲取文件大?。篺s.size(文件)

B、計算切片大?。涸O(shè)置minsize、maxsize、blocksize

mapreduce.input.fileinputformat.split.minsize=1 默認(rèn)值為1
mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默認(rèn)值Long.MAXValue

計算公式 :computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))

最大取最小,最小取最大。因此切片大小默認(rèn)與 HDFS 的 block 保持一致。

maxsize(切片最大值): 參數(shù)如果調(diào)到比 blocksize 小,則會讓切片變小,而且就等于配置的這個參數(shù)的值。minsize(切片最小值): 參數(shù)調(diào)的比 blockSize 大,則可以讓切片變得比 blocksize 還大。

C、開始切片:getSplit()

每次切片時,都要判斷剩下的是否大于塊的1.1倍,不大于1.1倍就切分成一塊切片

D、將切片信息寫入切片規(guī)劃文件中:job.split

記錄起始位置、長度、所在切點列表等

E、提交切片規(guī)劃文件到y(tǒng)arn

yarn上MRAppMaster根據(jù)切片規(guī)劃計算MapTask數(shù)

三個文件:切片規(guī)則文件(job.split)、參數(shù)配置文件(job.xml)、程序jar包

2.4類圖

2.5TextInputFormat

(1)是FileInputFormat默認(rèn)的實現(xiàn)類

(2)按行讀取每條記錄,Key為該行在整個文件的超始字節(jié)偏移量,LongWritable型。Value為行內(nèi)容,不包括任何終止符(換行符、回車符),Text型。

2.6CombineTextInputFormat

(1)應(yīng)用場景:用于小文件過多的場景,將多個小文件從邏輯上規(guī)劃到一個切片中, 這樣多個小文件交給一個MapTask處理;

(2)虛擬存儲切片最大值默認(rèn)4M,最好根據(jù)實際的小文件大小來設(shè)置

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

2.7Read階段

MapTask通過InputFormat獲得RecordReader,從輸入InputSplit中解析出一個個Key/Value

3、Map階段

將解析出來的Key/Value交給用戶編寫的map()函數(shù)處理,并產(chǎn)生一系列新的Key/Value

4、Collect收集階段

(1)map()函數(shù)中,當(dāng)數(shù)據(jù)處理完成后,一般會調(diào)用OutputCollector.collect()輸出結(jié)果

(2)在該函數(shù)內(nèi)部,它會將生成的Key/Value分區(qū)(調(diào)用Partitioner),并寫入一個環(huán)形內(nèi)存緩沖區(qū)中

5、Shuffle階段

(1)map方法之后 ,reduce方法之前的數(shù)據(jù)處理過程稱之為Shuffle;

(2)環(huán)形內(nèi)存緩沖區(qū)

(3)Partition分區(qū)-默認(rèn)分區(qū)

A、根據(jù)需求按照條件輸出到不同分區(qū)

B、默認(rèn)分區(qū):根據(jù)key的hashcode對ReduceTask數(shù)理取模

C、默認(rèn)的ReduceTask的數(shù)量為1,對應(yīng)參數(shù)mapreduce.job.reduces

(4)Partition分區(qū)-自定義Partitioner

A、自定義類繼承Partitioner<key,value>,重寫getPartition()方法

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text text, FlowBean flowBean, int i) {
        // 獲取手機(jī)號前三位prePhone
        String phone = text.toString();
        String prePhone = phone.substring(0, 3);
        //定義一個分區(qū)號變量partition,根據(jù)prePhone 設(shè)置分區(qū)號
        int partition;
        if ("136".equals(prePhone)) {
            partition = 0;
        } else if ("137".equals(prePhone)) {
            partition = 1;
        } else if ("138".equals(prePhone)) {
            partition = 2;
        } else if ("139".equals(prePhone)) {
            partition = 3;
        } else {
            partition = 4;
        }
        //最后返回分區(qū)號partition
        return partition;
    }
}

B、在job驅(qū)動中,設(shè)置自定義partitioner,job.setPartitionerClass(自定義分區(qū)類.class)

C、自定義Partition后,要根據(jù)自定義Partitioner的邏輯設(shè)置相應(yīng)的數(shù)量的ReduceTask:job.setNumReduceTasks(數(shù)量)

public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.獲取job對象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 2.關(guān)聯(lián)本Driver類
        job.setJarByClass(FlowDriver.class);
        // 3.關(guān)聯(lián)Mapper和Reducer類
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        // 4.設(shè)置Map端輸出KV類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 5.設(shè)置最終輸出KV類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 6.設(shè)置程序的輸入和輸出路徑
        FileInputFormat.setInputPaths(job, new Path("C:\\install\\temp\\input\\input02\\phone_data.txt"));
        FileOutputFormat.setOutputPath(job, new Path("C:\\install\\temp\\output\\output06"));
        // 8.指定自定義分區(qū)器
        job.setPartitionerClass(ProvincePartitioner.class);
        // 9.同時也指定相應(yīng)數(shù)量的ReduceTask--對應(yīng)的參數(shù)mapreduce.job.reduces,默認(rèn)為1
        job.setNumReduceTasks(5);
        // 7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

(5)Partition分區(qū)總結(jié)

A、如果ReduceTask數(shù)量 > getPartition()結(jié)果數(shù),則會多產(chǎn)生幾個空的輸出文件

B、如果 1 <ReduceTask數(shù)量 <getPartition()結(jié)果數(shù),則有一部分分區(qū)數(shù)據(jù)無處安放,會異常

C、如果ReduceTask數(shù)量=1,則不管MapTask輸出多少個分區(qū)文件,最終結(jié)果只有一個ReduceTask,只會產(chǎn)生一個結(jié)果文件。(分區(qū)數(shù)不大于1,不會走默認(rèn)hash分區(qū)器和自定義分區(qū)器,直接返回)

D、分區(qū)號必須從0開始,逐一累加

(6)排序

A、排序是MapReduce框架中最重要的操作之一

B、MapTask和ReduceTask均會對數(shù)據(jù)按key進(jìn)行排序,該 操作屬于Hadoop的默認(rèn)行為 。任務(wù)應(yīng)用程序中的數(shù)據(jù)均會被排序,而不管邏輯上是否需要。

C、默認(rèn)排序是按照字典順序排序,排序的方法為快速排序

D、排序分類:部分排序、全排序、輔助排序、二次排序

(7)溢寫

A、當(dāng)環(huán)形緩沖區(qū)滿后,MapReduce會將數(shù)據(jù)寫到本地磁盤上,生成一個臨時文件

(8)Combiner

A、Combiner是MR程序中Mapper和Reducer之外的一種組件

B、Combiner的父類是Reducer

C、Combiner與Reducer區(qū)別:在于運行的位置 ,Combiner是在每一個MapTask所在節(jié)點運行,即在分區(qū)、排序后準(zhǔn)備溢寫前可以進(jìn)行combiner。Reducer是接收全局所有MapTask輸出結(jié)果。

D、Combiner的意義是對每一個MapTask的輸出進(jìn)行局部匯總,以減少網(wǎng)絡(luò)傳輸量

E、Combiner應(yīng)用前提是不影響最終的業(yè)務(wù)邏輯

public class WordCountCombiner extends Reducer<Text, IntWritable, Text,IntWritable> {
    private IntWritable outV = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        outV.set(sum);
        context.write(key,outV);
    }
}
public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 一獲取二關(guān)聯(lián)三設(shè)置一提交
        // 1.獲取配置信息及Job對象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 2.關(guān)聯(lián)本Driver程序的類
        job.setJarByClass(WordCountDriver.class);
        // 3.關(guān)聯(lián)Mapper和Reducer的業(yè)務(wù)類
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 4.設(shè)置Mapper輸出的KV類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5.設(shè)置最終輸出的KV類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 6.設(shè)置輸入和輸出路徑
        //FileInputFormat.setInputPaths(job, new Path(args[0]));
        //FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileInputFormat.setInputPaths(job, new Path("C:\\install\\temp\\input\\hadoop.txt"));
        FileOutputFormat.setOutputPath(job, new Path("C:\\install\\temp\\output\\output01-2"));
        // 8.設(shè)置Combiner類--方式一
        //job.setCombinerClass(WordCountCombiner.class);
        // 方式二:其新建的WordCountCombiner的reduce方法處理與正常的WordCountReducer中的reduce方法處理邏輯是一樣
        // 因此可以直接用此類作為combiner類
        job.setCombinerClass(WordCountReducer.class);
        // 9.設(shè)置ReduceTasks的數(shù)量--這樣就沒有reduce階段,就不會有shuffle,Combiner也就沒有用,直接由map輸出,
        // 文件名為part-m-00000,就是不part-r-00000,兩者結(jié)果是不一樣的
        // 即如果沒有reduce階段,即使設(shè)置了combiner也不起作用
        // job.setNumReduceTasks(0);
        // 7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1); // 0-正常退出 非0(1)異常終止(結(jié)束)
    }
}

(9)Meger

A、MapTask以分區(qū)為單位進(jìn)行合并,對所有臨時文件合并成一個大文件(output/file.out),同時生成相應(yīng)索引文件(output/file.out.index)

B、對某個分區(qū)采用多輪遞歸合并的方式,每次合并默認(rèn)10個文件,每個MapTask最終得到一個大文件

6、ReduceTask

(1)Copy階段

ReduceTask從各個MapTask上遠(yuǎn)程拷貝一片數(shù)據(jù),如大小超過閥值,則寫到磁盤上,否則直接放在內(nèi)存中

(2)Sort階段

由于各個MapTask已經(jīng)實現(xiàn)了對自己處理結(jié)果進(jìn)行了局部排序,因此ReduceTask只需要對所有數(shù)據(jù)進(jìn)行一次歸并排序即可

(3)Reducer階段

reduce()函數(shù)將計算結(jié)果寫到HDFS上

(4)其他

A、ReduceTask數(shù)量默認(rèn)是1,可手動設(shè)置job.setNumReduceTasks(數(shù)量)

B、ReduceTask=0,表示沒有reduce階段,輸出文件個數(shù)和Map個數(shù)一致

C、如果數(shù)據(jù)分布不均勻,就會在reduce階段產(chǎn)生數(shù)據(jù)傾斜

D、ReduceTask數(shù)量并不能任意設(shè)置,要考慮業(yè)務(wù)邏輯需求,具體多少個ReduceTask,需要根據(jù)集群性能確定

E、如果分區(qū)數(shù)不是1,但ReduceTask為1,不執(zhí)行分區(qū)過程(執(zhí)行分區(qū)的前提是判斷ReduceNum個數(shù)是否大于1)

到此這篇關(guān)于Java大數(shù)據(jù)處理的核心技術(shù)MapReduce框架的文章就介紹到這了,更多相關(guān)Java MapReduce框架內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java基于Scanner對象的簡單輸入計算功能示例

    Java基于Scanner對象的簡單輸入計算功能示例

    這篇文章主要介紹了Java基于Scanner對象的簡單輸入計算功能,結(jié)合實例形式分析了Java使用Scanner對象獲取用戶輸入半徑值計算圓形面積功能,需要的朋友可以參考下
    2018-01-01
  • Intellij?IDEA如何修改配置文件位置

    Intellij?IDEA如何修改配置文件位置

    這篇文章主要介紹了Intellij?IDEA--修改配置文件位置,文章末尾給大家介紹了Intellij?IDEA--宏的用法記錄操作過程,對此文感興趣的朋友跟隨小編一起看看吧
    2022-08-08
  • Java查看本機(jī)端口是否被占用源碼

    Java查看本機(jī)端口是否被占用源碼

    這篇文章主要介紹了Java查看本機(jī)端口是否被占用的主要原理,并結(jié)合具體實例給出了操作方法,需要的朋友可以參考下
    2017-09-09
  • springboot中報錯Invalid character found in the request的解決

    springboot中報錯Invalid character found in 

    這篇文章主要介紹了springboot中報錯Invalid character found in the request的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-09-09
  • Java線程并發(fā)中常見的鎖機(jī)制詳細(xì)介紹

    Java線程并發(fā)中常見的鎖機(jī)制詳細(xì)介紹

    越來越多的互聯(lián)網(wǎng)企業(yè)面臨著用戶量膨脹而帶來的并發(fā)安全問題。接下來通過本文給大家介紹Java線程并發(fā)中常見的鎖機(jī)制,感興趣的朋友一起看看吧
    2016-05-05
  • Springboot集成GraphicsMagick

    Springboot集成GraphicsMagick

    本文主要是教大家如何將GraphicsMagick命令行工具集成到Springboot項目中,便可以使用Java進(jìn)行圖片處理相關(guān)開發(fā)。
    2021-05-05
  • Mybatis-Plus實現(xiàn)自定義SQL具體方法

    Mybatis-Plus實現(xiàn)自定義SQL具體方法

    Mybatis-Plus是Mybatis的一個增強工具,它可以優(yōu)化我們的開發(fā)效率,這篇文章主要介紹了Mybatis-Plus實現(xiàn)自定義SQL,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-08-08
  • Springboot整合MongoDB的Docker開發(fā)教程全解

    Springboot整合MongoDB的Docker開發(fā)教程全解

    這篇文章主要介紹了Springboot整合MongoDB的Docker開發(fā),本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值 ,需要的朋友可以參考下
    2020-07-07
  • zookeeper的Leader選舉機(jī)制源碼解析

    zookeeper的Leader選舉機(jī)制源碼解析

    這篇文章主要為大家介紹了zookeeper的Leader選舉源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-03-03
  • Java switch多值匹配操作詳解

    Java switch多值匹配操作詳解

    這篇文章主要介紹了Java switch多值匹配操作詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-01-01

最新評論