欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

基于Python和Java實(shí)現(xiàn)單詞計(jì)數(shù)(Word Count)

 更新時(shí)間:2023年05月22日 11:49:18   作者:orion-orion  
Spark框架也是MapReduce-like模型,采用“分治-聚合”策略來對數(shù)據(jù)分布進(jìn)行分布并行處理,本文就來利用Spark實(shí)現(xiàn)單詞統(tǒng)計(jì)的功能,需要的可以參考一下

1 導(dǎo)引

我們在博客《Hadoop: 單詞計(jì)數(shù)(Word Count)的MapReduce實(shí)現(xiàn) 》中學(xué)習(xí)了如何用Hadoop-MapReduce實(shí)現(xiàn)單詞計(jì)數(shù),現(xiàn)在我們來看如何用Spark來實(shí)現(xiàn)同樣的功能。

2. Spark的MapReudce原理

Spark框架也是MapReduce-like模型,采用“分治-聚合”策略來對數(shù)據(jù)分布進(jìn)行分布并行處理。不過該框架相比Hadoop-MapReduce,具有以下兩個特點(diǎn):

  • 對大數(shù)據(jù)處理框架的輸入/輸出,中間數(shù)據(jù)進(jìn)行建模,將這些數(shù)據(jù)抽象為統(tǒng)一的數(shù)據(jù)結(jié)構(gòu)命名為彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset),并在此數(shù)據(jù)結(jié)構(gòu)上構(gòu)建了一系列通用的數(shù)據(jù)操作,使得用戶可以簡單地實(shí)現(xiàn)復(fù)雜的數(shù)據(jù)處理流程。
  • 采用了基于內(nèi)存的數(shù)據(jù)聚合、數(shù)據(jù)緩存等機(jī)制來加速應(yīng)用執(zhí)行尤其適用于迭代和交互式應(yīng)用。

Spark社區(qū)推薦用戶使用Dataset、DataFrame等面向結(jié)構(gòu)化數(shù)據(jù)的高層API(Structured API)來替代底層的RDD API,因?yàn)檫@些高層API含有更多的數(shù)據(jù)類型信息(Schema),支持SQL操作,并且可以利用經(jīng)過高度優(yōu)化的Spark SQL引擎來執(zhí)行。不過,由于RDD API更基礎(chǔ),更適合用來展示基本概念和原理,后面我們的代碼都使用RDD API。

Spark的RDD/dataset分為多個分區(qū)。RDD/Dataset的每一個分區(qū)都映射一個或多個數(shù)據(jù)文件, Spark通過該映射讀取數(shù)據(jù)輸入到RDD/dataset中。

因?yàn)槲覀冞@里采用的本地單機(jī)多線程調(diào)試模式,默認(rèn)分區(qū)數(shù)即為本地機(jī)器使用的線程數(shù),若在代碼中設(shè)置了local[N](使用N個線程),則默認(rèn)為N個分區(qū);若設(shè)為local[*](使用本地CPU核數(shù)個線程),則默認(rèn)分區(qū)數(shù)為本地CPU核數(shù)。大家可以通過調(diào)用RDD對象的getNumPartitions()查看實(shí)際分區(qū)個數(shù)。

我們下面的流程描述中,假設(shè)每個文件對應(yīng)一個分區(qū)。

Spark的Map示意圖如下:

Spark的Reduce示意圖如下:

3. Word Count的Java實(shí)現(xiàn)

項(xiàng)目架構(gòu)如下圖:

Word-Count-Spark
├─ input
│  ├─ file1.txt
│  ├─ file2.txt
│  └─ file3.txt
├─ output
│  └─ result.txt
├─ pom.xml
├─ src
│  ├─ main
│  │  └─ java
│  │     └─ WordCount.java
│  └─ test
└─ target

WordCount.java文件如下:

package com.orion;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;

import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import java.io.*;
import java.nio.file.*;

public class WordCount {
	private static Pattern SPACE = Pattern.compile(" ");

	public static void main(String[] args) throws Exception {
		if (args.length != 3) {
			System.err.println("Usage: WordCount <intput directory> <output directory> <number of local threads>");
			System.exit(1);
		}
                String input_path = args[0];
                String output_path = args[1];
		int n_threads = Integer.parseInt(args[2]);

		SparkSession spark = SparkSession.builder()
			.appName("WordCount")
			.master(String.format("local[%d]", n_threads))
			.getOrCreate();

		JavaRDD<String> lines = spark.read().textFile(input_path).javaRDD();

		JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
		JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
		JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);

		List<Tuple2<String, Integer>> output = counts.collect();

                String filePath = Paths.get(output_path, "result.txt").toString();
                BufferedWriter out = new BufferedWriter(new FileWriter(filePath));
		for (Tuple2<?, ?> tuple : output) {
			out.write(tuple._1() + ": " + tuple._2() + "\n");
		}
		out.close();
                spark.stop();
	}
}

pom.xml文件配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.WordCount</groupId>
  <artifactId>WordCount</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>WordCount</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>
  <!-- 集中定義版本號 -->
  <properties>
    <scala.version>2.12.10</scala.version>
    <scala.compat.version>2.12</scala.compat.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <project.timezone>UTC</project.timezone>
    <java.version>11</java.version>
    <scoverage.plugin.version>1.4.0</scoverage.plugin.version>
    <site.plugin.version>3.7.1</site.plugin.version>
    <scalatest.version>3.1.2</scalatest.version>
    <scalatest-maven-plugin>2.0.0</scalatest-maven-plugin>
    <scala.maven.plugin.version>4.4.0</scala.maven.plugin.version>
    <maven.compiler.plugin.version>3.8.0</maven.compiler.plugin.version>
    <maven.javadoc.plugin.version>3.2.0</maven.javadoc.plugin.version>
    <maven.source.plugin.version>3.2.1</maven.source.plugin.version>
    <maven.deploy.plugin.version>2.8.2</maven.deploy.plugin.version>
    <nexus.staging.maven.plugin.version>1.6.8</nexus.staging.maven.plugin.version>
    <maven.help.plugin.version>3.2.0</maven.help.plugin.version>
    <maven.gpg.plugin.version>1.6</maven.gpg.plugin.version>
    <maven.surefire.plugin.version>2.22.2</maven.surefire.plugin.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <spark.version>3.2.1</spark.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <!--======SCALA======-->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency> <!-- Spark dependency -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
  </dependencies>
  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
          <configuration>
              <source>11</source>
              <target>11</target>
              <fork>true</fork>
              <executable>/Library/Java/JavaVirtualMachines/jdk-11.0.15.jdk/Contents/Home/bin/javac</executable>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

記得配置輸入?yún)?shù)input、output、3分別代表輸入目錄、輸出目錄和使用本地線程數(shù)(在VSCode中在launch.json文件中配置)。編譯運(yùn)行后可在output目錄下查看result.txt

Tom: 1
Hello: 3
Goodbye: 1
World: 2
David: 1

可見成功完成了單詞計(jì)數(shù)功能。

4. Word Count的Python實(shí)現(xiàn)

先使用pip按照pyspark==3.8.2

pip install pyspark==3.8.2

注意PySpark只支持Java 8/11,請勿使用更高級的版本。這里我使用的是Java 11。運(yùn)行java -version可查看本機(jī)Java版本。

(base) orion-orion@MacBook-Pro ~ % java -version
java version "11.0.15" 2022-04-19 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.15+8-LTS-149)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.15+8-LTS-149, mixed mode)

項(xiàng)目架構(gòu)如下:

Word-Count-Spark
├─ input
│  ├─ file1.txt
│  ├─ file2.txt
│  └─ file3.txt
├─ output
│  └─ result.txt
├─ src
│  └─ word_count.py

word_count.py編寫如下:

from pyspark.sql import SparkSession
import sys
import os
from operator import add

if len(sys.argv) != 4:
    print("Usage: WordCount <intput directory> <output directory> <number of local threads>", file=sys.stderr)
    exit(1)
     
input_path, output_path, n_threads = sys.argv[1], sys.argv[2], int(sys.argv[3])

spark = SparkSession.builder.appName("WordCount").master("local[%d]" % n_threads).getOrCreate()

lines = spark.read.text(input_path).rdd.map(lambda r: r[0])

counts = lines.flatMap(lambda s: s.split(" "))\
    .map(lambda word: (word, 1))\
    .reduceByKey(add)

output = counts.collect()

with open(os.path.join(output_path, "result.txt"), "wt") as f:
    for (word, count) in output:
        f.write(str(word) +": " + str(count) + "\n")

spark.stop()

使用python word_count.py input output 3運(yùn)行后,可在output中查看對應(yīng)的輸出文件result.txt

Hello: 3
World: 2
Goodbye: 1
David: 1
Tom: 1

可見成功完成了單詞計(jì)數(shù)功能。

到此這篇關(guān)于基于Python和Java實(shí)現(xiàn)單詞計(jì)數(shù)(Word Count)的文章就介紹到這了,更多相關(guān)Python Java單詞計(jì)數(shù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 死鎖問題詳解

    死鎖問題詳解

    本文詳細(xì)介紹了死鎖,例如死鎖的概念、產(chǎn)生死鎖的條件、如何預(yù)防死鎖等等,有需要的朋友可以自行參考本篇文章,希望對你有所幫助
    2021-08-08
  • win10環(huán)境安裝kettle與linux環(huán)境安裝kettle的詳細(xì)過程

    win10環(huán)境安裝kettle與linux環(huán)境安裝kettle的詳細(xì)過程

    kettle是一款免費(fèi)開源的、可視化的、國際上比較流行的、功能強(qiáng)大的ETL必備工具,在ETL這一方面做的還不錯,下面介紹一下基于win10操作系統(tǒng)安裝kettle和linux操作系統(tǒng)安裝kettle的詳細(xì)過程,感興趣的朋友跟隨小編一起看看吧
    2022-11-11
  • vscode?ssh安裝librosa處理音頻的解決方法

    vscode?ssh安裝librosa處理音頻的解決方法

    這篇文章主要介紹了vscode?ssh安裝librosa處理音頻的解決方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-12-12
  • http請求405錯誤方法不被允許的解決 (Method not allowed)

    http請求405錯誤方法不被允許的解決 (Method not allowed)

    這篇文章主要介紹了http請求405錯誤方法不被允許的解決 (Method not allowed),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-12-12
  • 詳解git submodule HEAD detached 的問題

    詳解git submodule HEAD detached 的問題

    這篇文章主要介紹了詳解git submodule HEAD detached 的問題,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-08-08
  • Markdown語法備忘

    Markdown語法備忘

    Markdown 是一種輕量級標(biāo)記語言,它允許人們使用易讀易寫的純文本格式編寫文檔,然后轉(zhuǎn)換成格式豐富的HTML頁面
    2014-10-10
  • 聯(lián)邦學(xué)習(xí)論文解讀分散數(shù)據(jù)的深層網(wǎng)絡(luò)通信

    聯(lián)邦學(xué)習(xí)論文解讀分散數(shù)據(jù)的深層網(wǎng)絡(luò)通信

    這篇文章主要為大家介紹了論文解讀分散數(shù)據(jù)的深層網(wǎng)絡(luò)通信有效學(xué)習(xí),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-05-05
  • Vscode編輯器的巧妙用法(快速格式化代碼的方法)

    Vscode編輯器的巧妙用法(快速格式化代碼的方法)

    今天小編給大家分享一款超好用的格式化神器,Vscode編輯器是一款很好用的編輯器,學(xué)會這個神器可以省去很多麻煩不需要手動一點(diǎn)點(diǎn)縮進(jìn),對Vscode編輯器代碼格式化感興趣的朋友一起看看吧
    2021-05-05
  • 解決Git?Bash中文亂碼的問題

    解決Git?Bash中文亂碼的問題

    這篇文章介紹了解決Git?Bash中文亂碼的的方法,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-04-04
  • bs架構(gòu)和cs架構(gòu)的區(qū)別_動力節(jié)點(diǎn)Java學(xué)院整理

    bs架構(gòu)和cs架構(gòu)的區(qū)別_動力節(jié)點(diǎn)Java學(xué)院整理

    這篇文章主要介紹了bs架構(gòu)和cs架構(gòu)的區(qū)別,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-07-07

最新評論