基于Python和Java實現(xiàn)單詞計數(shù)(Word Count)
1 導引
我們在博客《Hadoop: 單詞計數(shù)(Word Count)的MapReduce實現(xiàn) 》中學習了如何用Hadoop-MapReduce實現(xiàn)單詞計數(shù),現(xiàn)在我們來看如何用Spark來實現(xiàn)同樣的功能。
2. Spark的MapReudce原理
Spark框架也是MapReduce-like模型,采用“分治-聚合”策略來對數(shù)據(jù)分布進行分布并行處理。不過該框架相比Hadoop-MapReduce,具有以下兩個特點:
- 對大數(shù)據(jù)處理框架的輸入/輸出,中間數(shù)據(jù)進行建模,將這些數(shù)據(jù)抽象為統(tǒng)一的數(shù)據(jù)結構命名為彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset),并在此數(shù)據(jù)結構上構建了一系列通用的數(shù)據(jù)操作,使得用戶可以簡單地實現(xiàn)復雜的數(shù)據(jù)處理流程。
- 采用了基于內存的數(shù)據(jù)聚合、數(shù)據(jù)緩存等機制來加速應用執(zhí)行尤其適用于迭代和交互式應用。
Spark社區(qū)推薦用戶使用Dataset、DataFrame等面向結構化數(shù)據(jù)的高層API(Structured API)來替代底層的RDD API,因為這些高層API含有更多的數(shù)據(jù)類型信息(Schema),支持SQL操作,并且可以利用經(jīng)過高度優(yōu)化的Spark SQL引擎來執(zhí)行。不過,由于RDD API更基礎,更適合用來展示基本概念和原理,后面我們的代碼都使用RDD API。
Spark的RDD/dataset分為多個分區(qū)。RDD/Dataset的每一個分區(qū)都映射一個或多個數(shù)據(jù)文件, Spark通過該映射讀取數(shù)據(jù)輸入到RDD/dataset中。
因為我們這里采用的本地單機多線程調試模式,默認分區(qū)數(shù)即為本地機器使用的線程數(shù),若在代碼中設置了local[N]
(使用N
個線程),則默認為N
個分區(qū);若設為local[*]
(使用本地CPU核數(shù)個線程),則默認分區(qū)數(shù)為本地CPU核數(shù)。大家可以通過調用RDD
對象的getNumPartitions()
查看實際分區(qū)個數(shù)。
我們下面的流程描述中,假設每個文件對應一個分區(qū)。
Spark的Map示意圖如下:
Spark的Reduce示意圖如下:
3. Word Count的Java實現(xiàn)
項目架構如下圖:
Word-Count-Spark
├─ input
│ ├─ file1.txt
│ ├─ file2.txt
│ └─ file3.txt
├─ output
│ └─ result.txt
├─ pom.xml
├─ src
│ ├─ main
│ │ └─ java
│ │ └─ WordCount.java
│ └─ test
└─ target
WordCount.java
文件如下:
package com.orion; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.SparkSession; import scala.Tuple2; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; import java.io.*; import java.nio.file.*; public class WordCount { private static Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception { if (args.length != 3) { System.err.println("Usage: WordCount <intput directory> <output directory> <number of local threads>"); System.exit(1); } String input_path = args[0]; String output_path = args[1]; int n_threads = Integer.parseInt(args[2]); SparkSession spark = SparkSession.builder() .appName("WordCount") .master(String.format("local[%d]", n_threads)) .getOrCreate(); JavaRDD<String> lines = spark.read().textFile(input_path).javaRDD(); JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator()); JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1)); JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2); List<Tuple2<String, Integer>> output = counts.collect(); String filePath = Paths.get(output_path, "result.txt").toString(); BufferedWriter out = new BufferedWriter(new FileWriter(filePath)); for (Tuple2<?, ?> tuple : output) { out.write(tuple._1() + ": " + tuple._2() + "\n"); } out.close(); spark.stop(); } }
pom.xml
文件配置如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.WordCount</groupId> <artifactId>WordCount</artifactId> <version>1.0-SNAPSHOT</version> <name>WordCount</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <!-- 集中定義版本號 --> <properties> <scala.version>2.12.10</scala.version> <scala.compat.version>2.12</scala.compat.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.timezone>UTC</project.timezone> <java.version>11</java.version> <scoverage.plugin.version>1.4.0</scoverage.plugin.version> <site.plugin.version>3.7.1</site.plugin.version> <scalatest.version>3.1.2</scalatest.version> <scalatest-maven-plugin>2.0.0</scalatest-maven-plugin> <scala.maven.plugin.version>4.4.0</scala.maven.plugin.version> <maven.compiler.plugin.version>3.8.0</maven.compiler.plugin.version> <maven.javadoc.plugin.version>3.2.0</maven.javadoc.plugin.version> <maven.source.plugin.version>3.2.1</maven.source.plugin.version> <maven.deploy.plugin.version>2.8.2</maven.deploy.plugin.version> <nexus.staging.maven.plugin.version>1.6.8</nexus.staging.maven.plugin.version> <maven.help.plugin.version>3.2.0</maven.help.plugin.version> <maven.gpg.plugin.version>1.6</maven.gpg.plugin.version> <maven.surefire.plugin.version>2.22.2</maven.surefire.plugin.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <spark.version>3.2.1</spark.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!--======SCALA======--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> </dependencies> <build> <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --> <plugins> <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle --> <plugin> <artifactId>maven-clean-plugin</artifactId> <version>3.1.0</version> </plugin> <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --> <plugin> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.1</version> </plugin> <plugin> <artifactId>maven-jar-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-install-plugin</artifactId> <version>2.5.2</version> </plugin> <plugin> <artifactId>maven-deploy-plugin</artifactId> <version>2.8.2</version> </plugin> <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle --> <plugin> <artifactId>maven-site-plugin</artifactId> <version>3.7.1</version> </plugin> <plugin> <artifactId>maven-project-info-reports-plugin</artifactId> <version>3.0.0</version> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>11</source> <target>11</target> <fork>true</fork> <executable>/Library/Java/JavaVirtualMachines/jdk-11.0.15.jdk/Contents/Home/bin/javac</executable> </configuration> </plugin> </plugins> </pluginManagement> </build> </project>
記得配置輸入?yún)?shù)input
、output
、3
分別代表輸入目錄、輸出目錄和使用本地線程數(shù)(在VSCode中在launch.json
文件中配置)。編譯運行后可在output
目錄下查看result.txt
:
Tom: 1
Hello: 3
Goodbye: 1
World: 2
David: 1
可見成功完成了單詞計數(shù)功能。
4. Word Count的Python實現(xiàn)
先使用pip按照pyspark==3.8.2
:
pip install pyspark==3.8.2
注意PySpark只支持Java 8/11,請勿使用更高級的版本。這里我使用的是Java 11。運行java -version
可查看本機Java版本。
(base) orion-orion@MacBook-Pro ~ % java -version
java version "11.0.15" 2022-04-19 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.15+8-LTS-149)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.15+8-LTS-149, mixed mode)
項目架構如下:
Word-Count-Spark
├─ input
│ ├─ file1.txt
│ ├─ file2.txt
│ └─ file3.txt
├─ output
│ └─ result.txt
├─ src
│ └─ word_count.py
word_count.py
編寫如下:
from pyspark.sql import SparkSession import sys import os from operator import add if len(sys.argv) != 4: print("Usage: WordCount <intput directory> <output directory> <number of local threads>", file=sys.stderr) exit(1) input_path, output_path, n_threads = sys.argv[1], sys.argv[2], int(sys.argv[3]) spark = SparkSession.builder.appName("WordCount").master("local[%d]" % n_threads).getOrCreate() lines = spark.read.text(input_path).rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda s: s.split(" "))\ .map(lambda word: (word, 1))\ .reduceByKey(add) output = counts.collect() with open(os.path.join(output_path, "result.txt"), "wt") as f: for (word, count) in output: f.write(str(word) +": " + str(count) + "\n") spark.stop()
使用python word_count.py input output 3
運行后,可在output
中查看對應的輸出文件result.txt
:
Hello: 3
World: 2
Goodbye: 1
David: 1
Tom: 1
可見成功完成了單詞計數(shù)功能。
到此這篇關于基于Python和Java實現(xiàn)單詞計數(shù)(Word Count)的文章就介紹到這了,更多相關Python Java單詞計數(shù)內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
win10環(huán)境安裝kettle與linux環(huán)境安裝kettle的詳細過程
kettle是一款免費開源的、可視化的、國際上比較流行的、功能強大的ETL必備工具,在ETL這一方面做的還不錯,下面介紹一下基于win10操作系統(tǒng)安裝kettle和linux操作系統(tǒng)安裝kettle的詳細過程,感興趣的朋友跟隨小編一起看看吧2022-11-11http請求405錯誤方法不被允許的解決 (Method not allowed)
這篇文章主要介紹了http請求405錯誤方法不被允許的解決 (Method not allowed),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-12-12詳解git submodule HEAD detached 的問題
這篇文章主要介紹了詳解git submodule HEAD detached 的問題,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-08-08聯(lián)邦學習論文解讀分散數(shù)據(jù)的深層網(wǎng)絡通信
這篇文章主要為大家介紹了論文解讀分散數(shù)據(jù)的深層網(wǎng)絡通信有效學習,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05bs架構和cs架構的區(qū)別_動力節(jié)點Java學院整理
這篇文章主要介紹了bs架構和cs架構的區(qū)別,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-07-07