Hadoop?MapReduce實現(xiàn)單詞計數(shù)(Word?Count)
1.Map與Reduce過程
1.1 Map過程
首先,Hadoop會把輸入數(shù)據(jù)劃分成等長的輸入分片(input split) 或分片發(fā)送到MapReduce。Hadoop為每個分片創(chuàng)建一個map任務(wù),由它來運行用戶自定義的map函數(shù)以分析每個分片中的記錄。在我們的單詞計數(shù)例子中,輸入是多個文件,一般一個文件對應(yīng)一個分片,如果文件太大則會劃分為多個分片。map函數(shù)的輸入以<key, value>
形式做為輸入,value
為文件的每一行,key
為該行在文件中的偏移量(一般我們會忽視)。這里map函數(shù)起到的作用為將每一行進行分詞為多個word
,并在context
中寫入<word, 1>
以代表該單詞出現(xiàn)一次。
map過程的示意圖如下:
mapper代碼編寫如下:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { //每次處理一行,一個mapper里的value為一行,key為該行在文件中的偏移量 StringTokenizer iter = new StringTokenizer(value.toString()); while (iter.hasMoreTokens()) { word.set(iter.nextToken()); // 向context中寫入<word, 1> context.write(word, one); System.out.println(word); } } }
如果我們能夠并行處理分片(不一定是完全并行),且分片是小塊的數(shù)據(jù),那么處理過程將會有一個好的負載平衡。但是如果分片太小,那么管理分片與map任務(wù)創(chuàng)建將會耗費太多時間。對于大多數(shù)作業(yè),理想分片大小為一個HDFS塊的大小,默認是64MB。
map任務(wù)的執(zhí)行節(jié)點和輸入數(shù)據(jù)的存儲節(jié)點相同時,Hadoop的性能能達到最佳,這就是計算機系統(tǒng)中所謂的data locality optimization(數(shù)據(jù)局部性優(yōu)化)。而最佳分片大小與塊大小相同的原因就在于,它能夠保證一個分片存儲在單個節(jié)點上,再大就不能了。
1.2 Reduce過程
接下來我們看reducer的編寫。reduce任務(wù)的多少并不是由輸入大小來決定,而是需要人工單獨指定的(默認為1個)。和上面map不同的是,reduce任務(wù)不再具有本地讀取的優(yōu)勢————一個reduce任務(wù)的輸入往往來自于所有mapper的輸出,因此map和reduce之間的數(shù)據(jù)流被稱為 shuffle(洗牌) 。Hadoop會先按照key-value對進行排序,然后將排序好的map的輸出通過網(wǎng)絡(luò)傳輸?shù)絩educe任務(wù)運行的節(jié)點,并在那里進行合并,然后傳遞到用戶定義的reduce函數(shù)中。
reduce 函數(shù)示意圖如下:
reducer代碼編寫如下:
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
2.完整代碼
2.1 項目架構(gòu)
關(guān)于VSCode+Java+Maven+Hadoop開發(fā)環(huán)境搭建,可以參見我的博客《VSCode+Maven+Hadoop開發(fā)環(huán)境搭建》,此處不再贅述。這里展示我們的項目架構(gòu)如下:
Word-Count-Hadoop
├─ input
│ ├─ file1
│ ├─ file2
│ └─ file3
├─ output
├─ pom.xml
├─ src
│ └─ main
│ └─ java
│ └─ WordCount.java
└─ target
WordCount.java
代碼如下:
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount{ public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { //每次處理一行,一個mapper里的value為一行,key為該行在文件中的偏移量 StringTokenizer iter = new StringTokenizer(value.toString()); while (iter.hasMoreTokens()) { word.set(iter.nextToken()); // 向context中寫入<word, 1> context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word_count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); //此處的Combine操作意為即第每個mapper工作完了先局部reduce一下,最后再全局reduce job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //第0個參數(shù)是輸入目錄,第1個參數(shù)是輸出目錄 //先判斷output path是否存在,如果存在則刪除 Path path = new Path(args[1]);// FileSystem fileSystem = path.getFileSystem(conf); if (fileSystem.exists(path)) { fileSystem.delete(path, true); } //設(shè)置輸入目錄和輸出目錄 FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } }
pom.xml
中記得配置Hadoop的依賴環(huán)境:
... <!-- 集中定義版本號 --> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <hadoop.version>3.3.1</hadoop.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!-- 導(dǎo)入hadoop依賴環(huán)境 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-api</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> ... </project>
此外,因為我們的程序自帶輸入?yún)?shù),我們還需要在VSCode的launch.json
中配置輸入?yún)?shù)intput
(代表輸入目錄)和output
(代表輸出目錄):
... "args": [ "input", "output" ], ...
編譯運行完畢后,可以查看output
文件夾下的part-r-00000
文件:
David 1
Goodbye 1
Hello 3
Tom 1
World 2
可見我們的程序正確地完成了單詞計數(shù)的功能。
以上就是Hadoop MapReduce實現(xiàn)單詞計數(shù)(Word Count)的詳細內(nèi)容,更多關(guān)于Hadoop MapReduce的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java 數(shù)據(jù)結(jié)構(gòu)與算法 (快速排序法)
這篇文章主要介紹了java 數(shù)據(jù)結(jié)構(gòu)與算法(快速排序法),,快速排序法是實踐中的一種快速的排序算法,在c++或?qū)ava基本類型的排序中特別有用,下面我們一起進入文章學(xué)習(xí)更詳細的內(nèi)容吧,需要的朋友可以參考下2022-02-02ThreadLocal數(shù)據(jù)存儲結(jié)構(gòu)原理解析
這篇文章主要為大家介紹了ThreadLocal數(shù)據(jù)存儲結(jié)構(gòu)原理解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-10-10spring boot動態(tài)切換數(shù)據(jù)源的實現(xiàn)
這篇文章主要介紹了spring boot動態(tài)切換數(shù)據(jù)源的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01