欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

windowns使用PySpark環(huán)境配置和基本操作

 更新時間:2021年05月17日 12:05:04   作者:Nick_Spider  
pyspark是Spark對Python的api接口,可以在Python環(huán)境中通過調(diào)用pyspark模塊來操作spark,這篇文章主要介紹了windowns使用PySpark環(huán)境配置和基本操作,感興趣的可以了解一下

下載依賴

首先需要下載hadoop和spark,解壓,然后設(shè)置環(huán)境變量。
hadoop清華源下載
spark清華源下載

HADOOP_HOME => /path/hadoop
SPARK_HOME => /path/spark

安裝pyspark。

pip install pyspark

基本使用

可以在shell終端,輸入pyspark,有如下回顯:

輸入以下指令進(jìn)行測試,并創(chuàng)建SparkContext,SparkContext是任何spark功能的入口點。

>>> from pyspark import SparkContext
>>> sc = SparkContext("local", "First App")

如果以上不會報錯,恭喜可以開始使用pyspark編寫代碼了。
不過,我這里使用IDE來編寫代碼,首先我們先在終端執(zhí)行以下代碼關(guān)閉SparkContext。

>>> sc.stop()

下面使用pycharm編寫代碼,如果修改了環(huán)境變量需要先重啟pycharm。
在pycharm運(yùn)行如下程序,程序會起本地模式的spark計算引擎,通過spark統(tǒng)計abc.txt文件中a和b出現(xiàn)行的數(shù)量,文件路徑需要自己指定。

from pyspark import SparkContext

sc = SparkContext("local", "First App")
logFile = "abc.txt"
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Line with a:%i,line with b:%i" % (numAs, numBs))

運(yùn)行結(jié)果如下:

20/03/11 16:15:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/03/11 16:15:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Line with a:3,line with b:1

這里說一下,同樣的工作使用python可以做,spark也可以做,使用spark主要是為了高效的進(jìn)行分布式計算。
戳pyspark教程
戳spark教程

RDD

RDD代表Resilient Distributed Dataset,它們是在多個節(jié)點上運(yùn)行和操作以在集群上進(jìn)行并行處理的元素,RDD是spark計算的操作對象。
一般,我們先使用數(shù)據(jù)創(chuàng)建RDD,然后對RDD進(jìn)行操作。
對RDD操作有兩種方法:
Transformation(轉(zhuǎn)換) - 這些操作應(yīng)用于RDD以創(chuàng)建新的RDD。例如filter,groupBy和map。
Action(操作) - 這些是應(yīng)用于RDD的操作,它指示Spark執(zhí)行計算并將結(jié)果發(fā)送回驅(qū)動程序,例如count,collect等。

創(chuàng)建RDD

parallelize是從列表創(chuàng)建RDD,先看一個例子:

from pyspark import SparkContext


sc = SparkContext("local", "count app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])
print(words)

結(jié)果中我們得到一個對象,就是我們列表數(shù)據(jù)的RDD對象,spark之后可以對他進(jìn)行操作。

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

Count

count方法返回RDD中的元素個數(shù)。

from pyspark import SparkContext


sc = SparkContext("local", "count app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])
print(words)

counts = words.count()
print("Number of elements in RDD -> %i" % counts)

返回結(jié)果:

Number of elements in RDD -> 8

Collect

collect返回RDD中的所有元素。

from pyspark import SparkContext


sc = SparkContext("local", "collect app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])
coll = words.collect()
print("Elements in RDD -> %s" % coll)

返回結(jié)果:

Elements in RDD -> ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

foreach

每個元素會使用foreach內(nèi)的函數(shù)進(jìn)行處理,但是不會返回任何對象。
下面的程序中,我們定義的一個累加器accumulator,用于儲存在foreach執(zhí)行過程中的值。

from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")

accum = sc.accumulator(0)
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)


def increment_counter(x):
    print(x)
    accum.add(x)
 return 0

s = rdd.foreach(increment_counter)
print(s)  # None
print("Counter value: ", accum)

返回結(jié)果:

None
Counter value:  15

filter

返回一個包含元素的新RDD,滿足過濾器的條件。

from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print("Fitered RDD -> %s" % (filtered))

 

Fitered RDD -> ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

也可以改寫成這樣:

from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"]
)


def g(x):
    for i in x:
        if "spark" in x:
            return i

words_filter = words.filter(g)
filtered = words_filter.collect()
print("Fitered RDD -> %s" % (filtered))

map

將函數(shù)應(yīng)用于RDD中的每個元素并返回新的RDD。

from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1, "_{}".format(x)))
mapping = words_map.collect()
print("Key value pair -> %s" % (mapping))

返回結(jié)果:

Key value pair -> [('scala', 1, '_scala'), ('java', 1, '_java'), ('hadoop', 1, '_hadoop'), ('spark', 1, '_spark'), ('akka', 1, '_akka'), ('spark vs hadoop', 1, '_spark vs hadoop'), ('pyspark', 1, '_pyspark'), ('pyspark and spark', 1, '_pyspark and spark')]

Reduce

執(zhí)行指定的可交換和關(guān)聯(lián)二元操作后,然后返回RDD中的元素。

from pyspark import SparkContext
from operator import add


sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print("Adding all the elements -> %i" % (adding))

 這里的add是python內(nèi)置的函數(shù),可以使用ide查看:

def add(a, b):
    "Same as a + b."
    return a + b

reduce會依次對元素相加,相加后的結(jié)果加上其他元素,最后返回結(jié)果(RDD中的元素)。

Adding all the elements -> 15

Join

返回RDD,包含兩者同時匹配的鍵,鍵包含對應(yīng)的所有元素。

from pyspark import SparkContext


sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4), ("python", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
print("x =>", x.collect())
print("y =>", y.collect())
joined = x.join(y)
final = joined.collect()
print( "Join RDD -> %s" % (final))

返回結(jié)果:

x => [('spark', 1), ('hadoop', 4), ('python', 4)]
y => [('spark', 2), ('hadoop', 5)]
Join RDD -> [('hadoop', (4, 5)), ('spark', (1, 2))]

到此這篇關(guān)于windowns使用PySpark環(huán)境配置和基本操作的文章就介紹到這了,更多相關(guān)PySpark環(huán)境配置 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • YOLOv5改進(jìn)教程之添加注意力機(jī)制

    YOLOv5改進(jìn)教程之添加注意力機(jī)制

    注意力機(jī)制最先被用在NLP領(lǐng)域,Attention就是為了讓模型認(rèn)識到數(shù)據(jù)中哪一部分是最重要的,為它分配更大的權(quán)重,獲得更多的注意力在一些特征上,讓模型表現(xiàn)更好,這篇文章主要給大家介紹了關(guān)于YOLOv5改進(jìn)教程之添加注意力機(jī)制的相關(guān)資料,需要的朋友可以參考下
    2022-06-06
  • 深入理解Python中的Contextlib庫

    深入理解Python中的Contextlib庫

    Python提供了一些內(nèi)建的庫以支持各種常見的編程任務(wù),Contextlib庫是其中之一,它提供了一些用于支持上下文管理協(xié)議(即with語句)的函數(shù),這篇文章將詳細(xì)介紹如何使用Contextlib庫中的功能,需要的朋友可以參考下
    2023-06-06
  • python iloc和loc切片的實現(xiàn)

    python iloc和loc切片的實現(xiàn)

    本文主要介紹了python iloc和loc切片的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-05-05
  • python 對給定可迭代集合統(tǒng)計出現(xiàn)頻率,并排序的方法

    python 對給定可迭代集合統(tǒng)計出現(xiàn)頻率,并排序的方法

    今天小編就為大家分享一篇python 對給定可迭代集合統(tǒng)計出現(xiàn)頻率,并排序的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-10-10
  • 詳解TensorFlow訓(xùn)練網(wǎng)絡(luò)兩種方式

    詳解TensorFlow訓(xùn)練網(wǎng)絡(luò)兩種方式

    本文主要介紹了TensorFlow訓(xùn)練網(wǎng)絡(luò)兩種方式,一種是基于tensor(array),另外一種是迭代器,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-12-12
  • Python編程中用close()方法關(guān)閉文件的教程

    Python編程中用close()方法關(guān)閉文件的教程

    這篇文章主要介紹了Python編程中用close()方法關(guān)閉文件的教程,是Python編程入門中的基礎(chǔ)知識,需要的朋友可以參考下
    2015-05-05
  • Python退出While循環(huán)的3種方法舉例詳解

    Python退出While循環(huán)的3種方法舉例詳解

    在每次循環(huán)結(jié)束后,我們需要檢查循環(huán)條件是否滿足。如果條件滿足,則繼續(xù)執(zhí)行循環(huán)體內(nèi)的代碼,否則退出循環(huán),這篇文章主要給大家介紹了關(guān)于Python退出While循環(huán)的3種方法,需要的朋友可以參考下
    2023-10-10
  • Python3 能振興 Python的原因分析

    Python3 能振興 Python的原因分析

    我從Stephen A. Goss那讀到關(guān)于了《Python 3正在毀滅Python》。這篇文章有不少精彩的論點,但我卻并不認(rèn)為Python 3是在毀滅Python,也不認(rèn)為整個局面對Python一點也不利
    2014-11-11
  • paramiko模塊安裝和使用(遠(yuǎn)程登錄服務(wù)器)

    paramiko模塊安裝和使用(遠(yuǎn)程登錄服務(wù)器)

    paramiko是用python語言寫的一個模塊,遵循SSH2協(xié)議,支持以加密和認(rèn)證的方式,進(jìn)行遠(yuǎn)程服務(wù)器的連接,下面學(xué)習(xí)一下它的使用方法
    2014-01-01
  • Python編程快速上手——Excel表格創(chuàng)建乘法表案例分析

    Python編程快速上手——Excel表格創(chuàng)建乘法表案例分析

    這篇文章主要介紹了Python Excel表格創(chuàng)建乘法表,結(jié)合具體實例形式分析了Python接受cmd命令操作Excel文件創(chuàng)建乘法表相關(guān)實現(xiàn)技巧,需要的朋友可以參考下
    2020-02-02

最新評論