欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

PySpark中RDD的數(shù)據(jù)輸出問(wèn)題詳解

 更新時(shí)間:2023年01月15日 09:46:02   作者:陽(yáng)862  
RDD是 Spark 中最基礎(chǔ)的抽象,它表示了一個(gè)可以并行操作的、不可變得、被分區(qū)了的元素集合,這篇文章主要介紹了PySpark中RDD的數(shù)據(jù)輸出詳解,需要的朋友可以參考下

RDD概念

RDD(resilient distributed dataset ,彈性分布式數(shù)據(jù)集),是 Spark 中最基礎(chǔ)的抽象。它表示了一個(gè)可以并行操作的、不可變得、被分區(qū)了的元素集合。用戶不需要關(guān)心底層復(fù)雜的抽象處理,直接使用方便的算子處理和計(jì)算就可以了。

RDD的特點(diǎn)

1) . 分布式 RDD是一個(gè)抽象的概念,RDD在spark driver中,通過(guò)RDD來(lái)引用數(shù)據(jù),數(shù)據(jù)真正存儲(chǔ)在節(jié)點(diǎn)機(jī)的partition上。

2). 只讀 在Spark中RDD一旦生成了,就不能修改。 那么為什么要設(shè)置為只讀,設(shè)置為只讀的話,因?yàn)椴淮嬖谛薷模l(fā)的吞吐量就上來(lái)了。

3). 血緣關(guān)系 我們需要對(duì)RDD進(jìn)行一系列的操作,因?yàn)镽DD是只讀的,我們只能不斷的生產(chǎn)新的RDD,這樣,新的RDD與原來(lái)的RDD就會(huì)存在一些血緣關(guān)系。

Spark會(huì)記錄這些血緣關(guān)系,在后期的容錯(cuò)上會(huì)有很大的益處。

4). 緩存 當(dāng)一個(gè) RDD 需要被重復(fù)使用時(shí),或者當(dāng)任務(wù)失敗重新計(jì)算的時(shí)候,這時(shí)如果將 RDD 緩存起來(lái),就可以避免重新計(jì)算,保證程序運(yùn)行的性能。

一. 回顧

數(shù)據(jù)輸入:

  • sc.parallelize
  • sc.textFile

數(shù)據(jù)計(jì)算:

  • rdd.map
  • rdd.flatMap
  • rdd.reduceByKey
  • .…

二.輸出為python對(duì)象

數(shù)據(jù)輸出可用的方法是很多的,這里簡(jiǎn)單介紹常會(huì)用到的4個(gè)

  • collect:將RDD內(nèi)容轉(zhuǎn)換為list
  • reduce:對(duì)RDD內(nèi)容進(jìn)行自定義聚合
  • take:取出RDD的前N個(gè)元素組成list
  • count:統(tǒng)計(jì)RDD元素個(gè)數(shù)

collect算子

功能:將RDD各個(gè)分區(qū)內(nèi)的數(shù)據(jù),統(tǒng)一收集到Driver中,形成一個(gè)List對(duì)象
用法:
rdd.collect()
返回值是一個(gè)list

演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#準(zhǔn)備一個(gè)RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,輸出RDD為list對(duì)象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())

結(jié)果是

 單獨(dú)輸出rdd,輸出的是rdd的類名而非內(nèi)容

reduce算子

功能:對(duì)RDD數(shù)據(jù)集按照你傳入的邏輯進(jìn)行聚合

語(yǔ)法:

代碼

 返回值等于計(jì)算函數(shù)的返回值

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#準(zhǔn)備一個(gè)RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,輸出RDD為list對(duì)象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的類型是:",type(rdd.collect()))
#reduce算子,對(duì)RDD進(jìn)行兩兩聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)

結(jié)果是

 take算子

功能:取RDD的前N個(gè)元素,組合成list返回給你
用法:

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#準(zhǔn)備一個(gè)RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,輸出RDD為list對(duì)象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的類型是:",type(rdd.collect()))
#reduce算子,對(duì)RDD進(jìn)行兩兩聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)
#take算子,取出RDD前n個(gè)元素,組成list返回
take_list=rdd.take(3)
print(take_list)

結(jié)果是

 count算子

功能:計(jì)算RDD有多少條數(shù)據(jù),返回值是一個(gè)數(shù)字
用法:

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#準(zhǔn)備一個(gè)RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,輸出RDD為list對(duì)象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的類型是:",type(rdd.collect()))
#reduce算子,對(duì)RDD進(jìn)行兩兩聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)
#take算子,取出RDD前n個(gè)元素,組成list返回
take_list=rdd.take(3)
print(take_list)
#count算子,統(tǒng)計(jì)rdd中有多少條數(shù)據(jù),返回值為數(shù)字
num_count=rdd.count()
print(num_count)
#關(guān)閉鏈接
sc.stop()

結(jié)果是

小結(jié)

1.Spark的編程流程就是:

  • 將數(shù)據(jù)加載為RDD(數(shù)據(jù)輸入)對(duì)RDD進(jìn)行計(jì)算(數(shù)據(jù)計(jì)算)
  • 將RDD轉(zhuǎn)換為Python對(duì)象(數(shù)據(jù)輸出)

 2.數(shù)據(jù)輸出的方法

  • collect:將RDD內(nèi)容轉(zhuǎn)換為list
  • reduce:對(duì)RDD內(nèi)容進(jìn)行自定義聚合
  • take:取出RDD的前N個(gè)元素組成list
  • count:統(tǒng)計(jì)RDD元素個(gè)數(shù)

數(shù)據(jù)輸出可用的方法是很多的,這里只是簡(jiǎn)單介紹4個(gè)

三.輸出到文件中

savaAsTextFile算子

功能:將RDD的數(shù)據(jù)寫(xiě)入文本文件中支持本地寫(xiě)出, hdfs等文件系統(tǒng).
代碼:

 演示

 這是因?yàn)檫@個(gè)方法本質(zhì)上依賴大數(shù)據(jù)的Hadoop框架,需要配置Hadoop 依賴.

配置Hadoop依賴

調(diào)用保存文件的算子,需要配置Hadoop依賴。

  • 下載Hadoop安裝包解壓到電腦任意位置
  • 在Python代碼中使用os模塊配置: os.environ['HADOOP_HOME']='HADOOP解壓文件夾路徑′。
  • 下載winutils.exe,并放入Hadoop解壓文件夾的bin目錄內(nèi)
  • 下載hadoop.dll,并放入:C:/Windows/System32文件夾內(nèi)

配置完成之后,執(zhí)行下面的代碼

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#準(zhǔn)備rdd
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)])
rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]])
#輸出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

結(jié)果是

 輸出的文件夾中有這么8文件,是因?yàn)镽DD被默認(rèn)為分成8個(gè)分區(qū)
SaveAsTextFile算子輸出文件的個(gè)數(shù)是根據(jù)RDD的分區(qū)來(lái)決定的,有多少分區(qū)就會(huì)輸出多少個(gè)文件,RDD在本電腦中默認(rèn)是8(該電腦CPU核心數(shù)是8核)

 打開(kāi)設(shè)備管理器就可以查看處理器個(gè)數(shù),這里是有8個(gè)邏輯CPU
或者打開(kāi)任務(wù)管理器就可以看到是4核8個(gè)邏輯CPU

 修改rdd分區(qū)為1個(gè)

方式1, SparkConf對(duì)象設(shè)置屬性全局并行度為1:

 方式2,創(chuàng)建RDD的時(shí)候設(shè)置( parallelize方法傳入numSlices參數(shù)為1)

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#rdd分區(qū)設(shè)置為1
conf.set("spark.default.parallelism","1")
sc=SparkContext(conf=conf)
 
#準(zhǔn)備rdd
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)])
rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]])
#輸出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

結(jié)果是

 小結(jié)

1.RDD輸出到文件的方法

  • rdd.saveAsTextFile(路徑)
  • 輸出的結(jié)果是一個(gè)文件夾
  • 有幾個(gè)分區(qū)就輸出多少個(gè)結(jié)果文件

2.如何修改RDD分區(qū)

  • SparkConf對(duì)象設(shè)置conf.set("spark.default.parallelism", "7")
  • 創(chuàng)建RDD的時(shí)候,sc.parallelize方法傳入numSlices參數(shù)為1

四.練習(xí)案例

需求: 

讀取文件轉(zhuǎn)換成RDD,并完成:

  • 打印輸出:熱門(mén)搜索時(shí)間段(小時(shí)精度)Top3
  • 打印輸出:熱門(mén)搜索詞Top3
  • 打印輸出:統(tǒng)計(jì)黑馬程序員關(guān)鍵字在哪個(gè)時(shí)段被搜索最多
  • 將數(shù)據(jù)轉(zhuǎn)換為JSON格式,寫(xiě)出為文件

代碼

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#rdd分區(qū)設(shè)置為1
conf.set("spark.default.parallelism","1")
sc=SparkContext(conf=conf)
 
rdd=sc.textFile("D:/search_log.txt")
#需求1 打印輸出:熱門(mén)搜索時(shí)間段(小時(shí)精度)Top3
# 取出全部的時(shí)間并轉(zhuǎn)換為小時(shí)
# 轉(zhuǎn)換為(小時(shí),1)的二元元組
# Key分組聚合Value
# 排序(降序)
# 取前3
result1=rdd.map(lambda x:x.split("\t")).\
    map(lambda x:x[0][:2]).\
    map(lambda x:(x,1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)#上面用的‘/'是換行的意思,當(dāng)一行代碼太長(zhǎng)時(shí)就可以這樣用
print(result1)
#需求2 打印輸出:熱門(mén)搜索詞Top3
# 取出全部的搜索詞
# (詞,1)二元元組
# 分組聚合
# 排序
# Top3
result2=rdd.map(lambda x:x.split("\t")).\
    map(lambda x:x[2])\
    .map(lambda x:(x,1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)
print(result2)
#需求3 打印輸出:統(tǒng)計(jì)黑馬程序員關(guān)鍵字在哪個(gè)時(shí)段被搜索最多
result3=rdd.map(lambda x:x.split("\t")).\
    filter((lambda x:x[2]=="黑馬程序員")).\
    map(lambda x:(x[0][:2],1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)
print(result3)
#需求4 將數(shù)據(jù)轉(zhuǎn)換為JSON格式,寫(xiě)出為文件
rdd.map(lambda x:x.split("\t")).\
    map(lambda x:{"time":x[0],"id":x[1],"key":x[2],"num1":x[3],"num2":x[4],"url":x[5]})\
    .saveAsTextFile("D:/out_json")

結(jié)果是

到此這篇關(guān)于PySpark中RDD的數(shù)據(jù)輸出詳解的文章就介紹到這了,更多相關(guān)PySpark RDD數(shù)據(jù)輸出內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Python實(shí)現(xiàn)簡(jiǎn)易信息分類存儲(chǔ)軟件

    Python實(shí)現(xiàn)簡(jiǎn)易信息分類存儲(chǔ)軟件

    這篇文章主要介紹的是通過(guò)Python制作一個(gè)簡(jiǎn)易的文件分類存儲(chǔ)文件,可以實(shí)現(xiàn)信息的增刪改查以及內(nèi)容的導(dǎo)出和回復(fù),文中的示例代碼對(duì)我們的學(xué)習(xí)有一定的價(jià)值,感興趣的同學(xué)可以了解一下
    2021-12-12
  • Python常用內(nèi)置函數(shù)和關(guān)鍵字使用詳解

    Python常用內(nèi)置函數(shù)和關(guān)鍵字使用詳解

    在Python中有許許多多的內(nèi)置函數(shù)和關(guān)鍵字,它們是我們?nèi)粘V薪?jīng)??梢允褂玫牡降囊恍┗A(chǔ)的工具,可以方便我們的工作。本文將詳細(xì)講解他們的使用方法,需要的可以參考一下
    2022-05-05
  • python?super()函數(shù)的詳解

    python?super()函數(shù)的詳解

    這篇文章主要為大家介紹了python?super()函數(shù),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助
    2021-11-11
  • Python實(shí)現(xiàn)解析參數(shù)的三種方法詳解

    Python實(shí)現(xiàn)解析參數(shù)的三種方法詳解

    這篇文章主要介紹了python解析參數(shù)的三種方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2022-07-07
  • python將excel轉(zhuǎn)換為csv的代碼方法總結(jié)

    python將excel轉(zhuǎn)換為csv的代碼方法總結(jié)

    在本篇文章里小編給大家分享了關(guān)于python如何將excel轉(zhuǎn)換為csv的實(shí)例方法和代碼內(nèi)容,需要的朋友們學(xué)習(xí)下。
    2019-07-07
  • Python 將json序列化后的字符串轉(zhuǎn)換成字典(推薦)

    Python 將json序列化后的字符串轉(zhuǎn)換成字典(推薦)

    這篇文章主要介紹了Python 將json序列化后的字符串轉(zhuǎn)換成字典,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-01-01
  • Python可視化Tkinter進(jìn)階grid布局詳情

    Python可視化Tkinter進(jìn)階grid布局詳情

    這篇文章主要介紹了Python可視化Tkinter進(jìn)階grid布局詳情,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下
    2022-07-07
  • Python 恐龍跑跑小游戲?qū)崿F(xiàn)流程

    Python 恐龍跑跑小游戲?qū)崿F(xiàn)流程

    大家好,本篇文章主要講的是用python實(shí)現(xiàn)谷歌小恐龍小游戲,看看這是你斷網(wǎng)時(shí)的樣子么,感興趣的同學(xué)趕快來(lái)看一看吧,對(duì)你有幫助的話記得收藏一下
    2022-02-02
  • Python生成rsa密鑰對(duì)操作示例

    Python生成rsa密鑰對(duì)操作示例

    這篇文章主要介紹了Python生成rsa密鑰對(duì)操作,涉及Python rsa加密與密鑰生成相關(guān)操作技巧,需要的朋友可以參考下
    2019-04-04
  • python閉包的實(shí)例詳解

    python閉包的實(shí)例詳解

    在本篇文章里小編給大家整理的是一篇關(guān)于python閉包的實(shí)例詳解內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。
    2021-10-10

最新評(píng)論