欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java大數(shù)據(jù)處理的核心技術(shù)MapReduce框架

 更新時(shí)間:2023年05月09日 09:25:43   作者:mxh_wishes  
MapReduce是一種分布式計(jì)算框架,適用于大規(guī)模的數(shù)據(jù)處理。它將大數(shù)據(jù)分成多個(gè)小數(shù)據(jù)塊,通過(guò)Map和Reduce兩個(gè)階段對(duì)數(shù)據(jù)進(jìn)行處理和分析。MapReduce框架具有可靠、高效、可擴(kuò)展等特點(diǎn),已經(jīng)成為大數(shù)據(jù)處理的核心技術(shù)

MapReduce框架

1、框架圖

Input→Mapper→shuffle→Reducer→Output

2、Input數(shù)據(jù)輸入

2.1概念

(1)數(shù)據(jù)塊(Block),物理存儲(chǔ),Block是HDFS物理上把文件分成一塊一塊。數(shù)據(jù)塊是HDFS存儲(chǔ)數(shù)據(jù)單位。

(2)數(shù)據(jù)切片,邏輯存儲(chǔ),數(shù)據(jù)切片是MapReduce程序j最小計(jì)算輸入數(shù)據(jù)的單位。一個(gè)切片會(huì)啟動(dòng)一個(gè)MapTask

2.2數(shù)據(jù)切片與MapTask并行度

(1)一個(gè)Job的Map階段并行度由客戶端在提交job時(shí)的切片數(shù)決定;

(2)每一個(gè)split切片分配一個(gè)MapTask并行實(shí)例片

(3)切片是針對(duì)每一個(gè)文件單獨(dú)切片

(4)默認(rèn)情況下,切片大小等于Block Size塊大小

MapTask數(shù)據(jù)=輸入文件切片數(shù)據(jù)

2.3切片過(guò)程

(1)程序先找到數(shù)據(jù)存儲(chǔ)目錄

(2)開(kāi)始遍歷處理目錄下的每一個(gè)文件

A、按每個(gè)文件進(jìn)行切片

B、判斷文件是否可以切片(snappy、Gzip壓縮不能切)

(3)遍歷第一個(gè)文件

獲取文件大小→計(jì)算切片大小→開(kāi)始切片→將切片信息寫入切片規(guī)劃文件中→提交切片規(guī)劃文件到y(tǒng)arn

A、獲取文件大小:fs.size(文件)

B、計(jì)算切片大小:設(shè)置minsize、maxsize、blocksize

mapreduce.input.fileinputformat.split.minsize=1 默認(rèn)值為1
mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默認(rèn)值Long.MAXValue

計(jì)算公式 :computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))

最大取最小,最小取最大。因此切片大小默認(rèn)與 HDFS 的 block 保持一致。

maxsize(切片最大值): 參數(shù)如果調(diào)到比 blocksize 小,則會(huì)讓切片變小,而且就等于配置的這個(gè)參數(shù)的值。minsize(切片最小值): 參數(shù)調(diào)的比 blockSize 大,則可以讓切片變得比 blocksize 還大。

C、開(kāi)始切片:getSplit()

每次切片時(shí),都要判斷剩下的是否大于塊的1.1倍,不大于1.1倍就切分成一塊切片

D、將切片信息寫入切片規(guī)劃文件中:job.split

記錄起始位置、長(zhǎng)度、所在切點(diǎn)列表等

E、提交切片規(guī)劃文件到y(tǒng)arn

yarn上MRAppMaster根據(jù)切片規(guī)劃計(jì)算MapTask數(shù)

三個(gè)文件:切片規(guī)則文件(job.split)、參數(shù)配置文件(job.xml)、程序jar包

2.4類圖

2.5TextInputFormat

(1)是FileInputFormat默認(rèn)的實(shí)現(xiàn)類

(2)按行讀取每條記錄,Key為該行在整個(gè)文件的超始字節(jié)偏移量,LongWritable型。Value為行內(nèi)容,不包括任何終止符(換行符、回車符),Text型。

2.6CombineTextInputFormat

(1)應(yīng)用場(chǎng)景:用于小文件過(guò)多的場(chǎng)景,將多個(gè)小文件從邏輯上規(guī)劃到一個(gè)切片中, 這樣多個(gè)小文件交給一個(gè)MapTask處理;

(2)虛擬存儲(chǔ)切片最大值默認(rèn)4M,最好根據(jù)實(shí)際的小文件大小來(lái)設(shè)置

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

2.7Read階段

MapTask通過(guò)InputFormat獲得RecordReader,從輸入InputSplit中解析出一個(gè)個(gè)Key/Value

3、Map階段

將解析出來(lái)的Key/Value交給用戶編寫的map()函數(shù)處理,并產(chǎn)生一系列新的Key/Value

4、Collect收集階段

(1)map()函數(shù)中,當(dāng)數(shù)據(jù)處理完成后,一般會(huì)調(diào)用OutputCollector.collect()輸出結(jié)果

(2)在該函數(shù)內(nèi)部,它會(huì)將生成的Key/Value分區(qū)(調(diào)用Partitioner),并寫入一個(gè)環(huán)形內(nèi)存緩沖區(qū)中

5、Shuffle階段

(1)map方法之后 ,reduce方法之前的數(shù)據(jù)處理過(guò)程稱之為Shuffle;

(2)環(huán)形內(nèi)存緩沖區(qū)

(3)Partition分區(qū)-默認(rèn)分區(qū)

A、根據(jù)需求按照條件輸出到不同分區(qū)

B、默認(rèn)分區(qū):根據(jù)key的hashcode對(duì)ReduceTask數(shù)理取模

C、默認(rèn)的ReduceTask的數(shù)量為1,對(duì)應(yīng)參數(shù)mapreduce.job.reduces

(4)Partition分區(qū)-自定義Partitioner

A、自定義類繼承Partitioner<key,value>,重寫getPartition()方法

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text text, FlowBean flowBean, int i) {
        // 獲取手機(jī)號(hào)前三位prePhone
        String phone = text.toString();
        String prePhone = phone.substring(0, 3);
        //定義一個(gè)分區(qū)號(hào)變量partition,根據(jù)prePhone 設(shè)置分區(qū)號(hào)
        int partition;
        if ("136".equals(prePhone)) {
            partition = 0;
        } else if ("137".equals(prePhone)) {
            partition = 1;
        } else if ("138".equals(prePhone)) {
            partition = 2;
        } else if ("139".equals(prePhone)) {
            partition = 3;
        } else {
            partition = 4;
        }
        //最后返回分區(qū)號(hào)partition
        return partition;
    }
}

B、在job驅(qū)動(dòng)中,設(shè)置自定義partitioner,job.setPartitionerClass(自定義分區(qū)類.class)

C、自定義Partition后,要根據(jù)自定義Partitioner的邏輯設(shè)置相應(yīng)的數(shù)量的ReduceTask:job.setNumReduceTasks(數(shù)量)

public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.獲取job對(duì)象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 2.關(guān)聯(lián)本Driver類
        job.setJarByClass(FlowDriver.class);
        // 3.關(guān)聯(lián)Mapper和Reducer類
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        // 4.設(shè)置Map端輸出KV類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 5.設(shè)置最終輸出KV類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 6.設(shè)置程序的輸入和輸出路徑
        FileInputFormat.setInputPaths(job, new Path("C:\\install\\temp\\input\\input02\\phone_data.txt"));
        FileOutputFormat.setOutputPath(job, new Path("C:\\install\\temp\\output\\output06"));
        // 8.指定自定義分區(qū)器
        job.setPartitionerClass(ProvincePartitioner.class);
        // 9.同時(shí)也指定相應(yīng)數(shù)量的ReduceTask--對(duì)應(yīng)的參數(shù)mapreduce.job.reduces,默認(rèn)為1
        job.setNumReduceTasks(5);
        // 7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

(5)Partition分區(qū)總結(jié)

A、如果ReduceTask數(shù)量 > getPartition()結(jié)果數(shù),則會(huì)多產(chǎn)生幾個(gè)空的輸出文件

B、如果 1 <ReduceTask數(shù)量 <getPartition()結(jié)果數(shù),則有一部分分區(qū)數(shù)據(jù)無(wú)處安放,會(huì)異常

C、如果ReduceTask數(shù)量=1,則不管MapTask輸出多少個(gè)分區(qū)文件,最終結(jié)果只有一個(gè)ReduceTask,只會(huì)產(chǎn)生一個(gè)結(jié)果文件。(分區(qū)數(shù)不大于1,不會(huì)走默認(rèn)hash分區(qū)器和自定義分區(qū)器,直接返回)

D、分區(qū)號(hào)必須從0開(kāi)始,逐一累加

(6)排序

A、排序是MapReduce框架中最重要的操作之一

B、MapTask和ReduceTask均會(huì)對(duì)數(shù)據(jù)按key進(jìn)行排序,該 操作屬于Hadoop的默認(rèn)行為 。任務(wù)應(yīng)用程序中的數(shù)據(jù)均會(huì)被排序,而不管邏輯上是否需要。

C、默認(rèn)排序是按照字典順序排序,排序的方法為快速排序

D、排序分類:部分排序、全排序、輔助排序、二次排序

(7)溢寫

A、當(dāng)環(huán)形緩沖區(qū)滿后,MapReduce會(huì)將數(shù)據(jù)寫到本地磁盤上,生成一個(gè)臨時(shí)文件

(8)Combiner

A、Combiner是MR程序中Mapper和Reducer之外的一種組件

B、Combiner的父類是Reducer

C、Combiner與Reducer區(qū)別:在于運(yùn)行的位置 ,Combiner是在每一個(gè)MapTask所在節(jié)點(diǎn)運(yùn)行,即在分區(qū)、排序后準(zhǔn)備溢寫前可以進(jìn)行combiner。Reducer是接收全局所有MapTask輸出結(jié)果。

D、Combiner的意義是對(duì)每一個(gè)MapTask的輸出進(jìn)行局部匯總,以減少網(wǎng)絡(luò)傳輸量

E、Combiner應(yīng)用前提是不影響最終的業(yè)務(wù)邏輯

public class WordCountCombiner extends Reducer<Text, IntWritable, Text,IntWritable> {
    private IntWritable outV = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        outV.set(sum);
        context.write(key,outV);
    }
}
public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 一獲取二關(guān)聯(lián)三設(shè)置一提交
        // 1.獲取配置信息及Job對(duì)象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 2.關(guān)聯(lián)本Driver程序的類
        job.setJarByClass(WordCountDriver.class);
        // 3.關(guān)聯(lián)Mapper和Reducer的業(yè)務(wù)類
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 4.設(shè)置Mapper輸出的KV類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5.設(shè)置最終輸出的KV類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 6.設(shè)置輸入和輸出路徑
        //FileInputFormat.setInputPaths(job, new Path(args[0]));
        //FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileInputFormat.setInputPaths(job, new Path("C:\\install\\temp\\input\\hadoop.txt"));
        FileOutputFormat.setOutputPath(job, new Path("C:\\install\\temp\\output\\output01-2"));
        // 8.設(shè)置Combiner類--方式一
        //job.setCombinerClass(WordCountCombiner.class);
        // 方式二:其新建的WordCountCombiner的reduce方法處理與正常的WordCountReducer中的reduce方法處理邏輯是一樣
        // 因此可以直接用此類作為combiner類
        job.setCombinerClass(WordCountReducer.class);
        // 9.設(shè)置ReduceTasks的數(shù)量--這樣就沒(méi)有reduce階段,就不會(huì)有shuffle,Combiner也就沒(méi)有用,直接由map輸出,
        // 文件名為part-m-00000,就是不part-r-00000,兩者結(jié)果是不一樣的
        // 即如果沒(méi)有reduce階段,即使設(shè)置了combiner也不起作用
        // job.setNumReduceTasks(0);
        // 7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1); // 0-正常退出 非0(1)異常終止(結(jié)束)
    }
}

(9)Meger

A、MapTask以分區(qū)為單位進(jìn)行合并,對(duì)所有臨時(shí)文件合并成一個(gè)大文件(output/file.out),同時(shí)生成相應(yīng)索引文件(output/file.out.index)

B、對(duì)某個(gè)分區(qū)采用多輪遞歸合并的方式,每次合并默認(rèn)10個(gè)文件,每個(gè)MapTask最終得到一個(gè)大文件

6、ReduceTask

(1)Copy階段

ReduceTask從各個(gè)MapTask上遠(yuǎn)程拷貝一片數(shù)據(jù),如大小超過(guò)閥值,則寫到磁盤上,否則直接放在內(nèi)存中

(2)Sort階段

由于各個(gè)MapTask已經(jīng)實(shí)現(xiàn)了對(duì)自己處理結(jié)果進(jìn)行了局部排序,因此ReduceTask只需要對(duì)所有數(shù)據(jù)進(jìn)行一次歸并排序即可

(3)Reducer階段

reduce()函數(shù)將計(jì)算結(jié)果寫到HDFS上

(4)其他

A、ReduceTask數(shù)量默認(rèn)是1,可手動(dòng)設(shè)置job.setNumReduceTasks(數(shù)量)

B、ReduceTask=0,表示沒(méi)有reduce階段,輸出文件個(gè)數(shù)和Map個(gè)數(shù)一致

C、如果數(shù)據(jù)分布不均勻,就會(huì)在reduce階段產(chǎn)生數(shù)據(jù)傾斜

D、ReduceTask數(shù)量并不能任意設(shè)置,要考慮業(yè)務(wù)邏輯需求,具體多少個(gè)ReduceTask,需要根據(jù)集群性能確定

E、如果分區(qū)數(shù)不是1,但ReduceTask為1,不執(zhí)行分區(qū)過(guò)程(執(zhí)行分區(qū)的前提是判斷ReduceNum個(gè)數(shù)是否大于1)

到此這篇關(guān)于Java大數(shù)據(jù)處理的核心技術(shù)MapReduce框架的文章就介紹到這了,更多相關(guān)Java MapReduce框架內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java基于Scanner對(duì)象的簡(jiǎn)單輸入計(jì)算功能示例

    Java基于Scanner對(duì)象的簡(jiǎn)單輸入計(jì)算功能示例

    這篇文章主要介紹了Java基于Scanner對(duì)象的簡(jiǎn)單輸入計(jì)算功能,結(jié)合實(shí)例形式分析了Java使用Scanner對(duì)象獲取用戶輸入半徑值計(jì)算圓形面積功能,需要的朋友可以參考下
    2018-01-01
  • Intellij?IDEA如何修改配置文件位置

    Intellij?IDEA如何修改配置文件位置

    這篇文章主要介紹了Intellij?IDEA--修改配置文件位置,文章末尾給大家介紹了Intellij?IDEA--宏的用法記錄操作過(guò)程,對(duì)此文感興趣的朋友跟隨小編一起看看吧
    2022-08-08
  • Java查看本機(jī)端口是否被占用源碼

    Java查看本機(jī)端口是否被占用源碼

    這篇文章主要介紹了Java查看本機(jī)端口是否被占用的主要原理,并結(jié)合具體實(shí)例給出了操作方法,需要的朋友可以參考下
    2017-09-09
  • springboot中報(bào)錯(cuò)Invalid character found in the request的解決

    springboot中報(bào)錯(cuò)Invalid character found in 

    這篇文章主要介紹了springboot中報(bào)錯(cuò)Invalid character found in the request的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-09-09
  • Java線程并發(fā)中常見(jiàn)的鎖機(jī)制詳細(xì)介紹

    Java線程并發(fā)中常見(jiàn)的鎖機(jī)制詳細(xì)介紹

    越來(lái)越多的互聯(lián)網(wǎng)企業(yè)面臨著用戶量膨脹而帶來(lái)的并發(fā)安全問(wèn)題。接下來(lái)通過(guò)本文給大家介紹Java線程并發(fā)中常見(jiàn)的鎖機(jī)制,感興趣的朋友一起看看吧
    2016-05-05
  • Springboot集成GraphicsMagick

    Springboot集成GraphicsMagick

    本文主要是教大家如何將GraphicsMagick命令行工具集成到Springboot項(xiàng)目中,便可以使用Java進(jìn)行圖片處理相關(guān)開(kāi)發(fā)。
    2021-05-05
  • Mybatis-Plus實(shí)現(xiàn)自定義SQL具體方法

    Mybatis-Plus實(shí)現(xiàn)自定義SQL具體方法

    Mybatis-Plus是Mybatis的一個(gè)增強(qiáng)工具,它可以優(yōu)化我們的開(kāi)發(fā)效率,這篇文章主要介紹了Mybatis-Plus實(shí)現(xiàn)自定義SQL,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-08-08
  • Springboot整合MongoDB的Docker開(kāi)發(fā)教程全解

    Springboot整合MongoDB的Docker開(kāi)發(fā)教程全解

    這篇文章主要介紹了Springboot整合MongoDB的Docker開(kāi)發(fā),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值 ,需要的朋友可以參考下
    2020-07-07
  • zookeeper的Leader選舉機(jī)制源碼解析

    zookeeper的Leader選舉機(jī)制源碼解析

    這篇文章主要為大家介紹了zookeeper的Leader選舉源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-03-03
  • Java switch多值匹配操作詳解

    Java switch多值匹配操作詳解

    這篇文章主要介紹了Java switch多值匹配操作詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-01-01

最新評(píng)論