PySpark中RDD的數(shù)據(jù)輸出問(wèn)題詳解
RDD概念
RDD(resilient distributed dataset ,彈性分布式數(shù)據(jù)集),是 Spark 中最基礎(chǔ)的抽象。它表示了一個(gè)可以并行操作的、不可變得、被分區(qū)了的元素集合。用戶不需要關(guān)心底層復(fù)雜的抽象處理,直接使用方便的算子處理和計(jì)算就可以了。
RDD的特點(diǎn)
1) . 分布式 RDD是一個(gè)抽象的概念,RDD在spark driver中,通過(guò)RDD來(lái)引用數(shù)據(jù),數(shù)據(jù)真正存儲(chǔ)在節(jié)點(diǎn)機(jī)的partition上。
2). 只讀 在Spark中RDD一旦生成了,就不能修改。 那么為什么要設(shè)置為只讀,設(shè)置為只讀的話,因?yàn)椴淮嬖谛薷模l(fā)的吞吐量就上來(lái)了。
3). 血緣關(guān)系 我們需要對(duì)RDD進(jìn)行一系列的操作,因?yàn)镽DD是只讀的,我們只能不斷的生產(chǎn)新的RDD,這樣,新的RDD與原來(lái)的RDD就會(huì)存在一些血緣關(guān)系。
Spark會(huì)記錄這些血緣關(guān)系,在后期的容錯(cuò)上會(huì)有很大的益處。
4). 緩存 當(dāng)一個(gè) RDD 需要被重復(fù)使用時(shí),或者當(dāng)任務(wù)失敗重新計(jì)算的時(shí)候,這時(shí)如果將 RDD 緩存起來(lái),就可以避免重新計(jì)算,保證程序運(yùn)行的性能。
一. 回顧
數(shù)據(jù)輸入:
- sc.parallelize
- sc.textFile
數(shù)據(jù)計(jì)算:
- rdd.map
- rdd.flatMap
- rdd.reduceByKey
- .…
二.輸出為python對(duì)象
數(shù)據(jù)輸出可用的方法是很多的,這里簡(jiǎn)單介紹常會(huì)用到的4個(gè)
- collect:將RDD內(nèi)容轉(zhuǎn)換為list
- reduce:對(duì)RDD內(nèi)容進(jìn)行自定義聚合
- take:取出RDD的前N個(gè)元素組成list
- count:統(tǒng)計(jì)RDD元素個(gè)數(shù)
collect算子
功能:將RDD各個(gè)分區(qū)內(nèi)的數(shù)據(jù),統(tǒng)一收集到Driver中,形成一個(gè)List對(duì)象
用法:
rdd.collect()
返回值是一個(gè)list
演示
from pyspark import SparkContext,SparkConf import os os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" conf=SparkConf().setMaster("local[*]").setAppName("test_spark") sc=SparkContext(conf=conf) #準(zhǔn)備一個(gè)RDD rdd=sc.parallelize([1,2,3,4,5]) #collect算子,輸出RDD為list對(duì)象 print("rdd是:",rdd) print("rdd.collect是:",rdd.collect())
結(jié)果是
單獨(dú)輸出rdd,輸出的是rdd的類名而非內(nèi)容
reduce算子
功能:對(duì)RDD數(shù)據(jù)集按照你傳入的邏輯進(jìn)行聚合
語(yǔ)法:
代碼
返回值等于計(jì)算函數(shù)的返回值
演示
from pyspark import SparkContext,SparkConf import os os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" conf=SparkConf().setMaster("local[*]").setAppName("test_spark") sc=SparkContext(conf=conf) #準(zhǔn)備一個(gè)RDD rdd=sc.parallelize([1,2,3,4,5]) #collect算子,輸出RDD為list對(duì)象 print("rdd是:",rdd) print("rdd.collect是:",rdd.collect()) print("rdd.collect的類型是:",type(rdd.collect())) #reduce算子,對(duì)RDD進(jìn)行兩兩聚合 num=rdd.reduce(lambda x,y:x+y) print(num)
結(jié)果是
take算子
功能:取RDD的前N個(gè)元素,組合成list返回給你
用法:
演示
from pyspark import SparkContext,SparkConf import os os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" conf=SparkConf().setMaster("local[*]").setAppName("test_spark") sc=SparkContext(conf=conf) #準(zhǔn)備一個(gè)RDD rdd=sc.parallelize([1,2,3,4,5]) #collect算子,輸出RDD為list對(duì)象 print("rdd是:",rdd) print("rdd.collect是:",rdd.collect()) print("rdd.collect的類型是:",type(rdd.collect())) #reduce算子,對(duì)RDD進(jìn)行兩兩聚合 num=rdd.reduce(lambda x,y:x+y) print(num) #take算子,取出RDD前n個(gè)元素,組成list返回 take_list=rdd.take(3) print(take_list)
結(jié)果是
count算子
功能:計(jì)算RDD有多少條數(shù)據(jù),返回值是一個(gè)數(shù)字
用法:
演示
from pyspark import SparkContext,SparkConf import os os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" conf=SparkConf().setMaster("local[*]").setAppName("test_spark") sc=SparkContext(conf=conf) #準(zhǔn)備一個(gè)RDD rdd=sc.parallelize([1,2,3,4,5]) #collect算子,輸出RDD為list對(duì)象 print("rdd是:",rdd) print("rdd.collect是:",rdd.collect()) print("rdd.collect的類型是:",type(rdd.collect())) #reduce算子,對(duì)RDD進(jìn)行兩兩聚合 num=rdd.reduce(lambda x,y:x+y) print(num) #take算子,取出RDD前n個(gè)元素,組成list返回 take_list=rdd.take(3) print(take_list) #count算子,統(tǒng)計(jì)rdd中有多少條數(shù)據(jù),返回值為數(shù)字 num_count=rdd.count() print(num_count) #關(guān)閉鏈接 sc.stop()
結(jié)果是
小結(jié)
1.Spark的編程流程就是:
- 將數(shù)據(jù)加載為RDD(數(shù)據(jù)輸入)對(duì)RDD進(jìn)行計(jì)算(數(shù)據(jù)計(jì)算)
- 將RDD轉(zhuǎn)換為Python對(duì)象(數(shù)據(jù)輸出)
2.數(shù)據(jù)輸出的方法
- collect:將RDD內(nèi)容轉(zhuǎn)換為list
- reduce:對(duì)RDD內(nèi)容進(jìn)行自定義聚合
- take:取出RDD的前N個(gè)元素組成list
- count:統(tǒng)計(jì)RDD元素個(gè)數(shù)
數(shù)據(jù)輸出可用的方法是很多的,這里只是簡(jiǎn)單介紹4個(gè)
三.輸出到文件中
savaAsTextFile算子
功能:將RDD的數(shù)據(jù)寫(xiě)入文本文件中支持本地寫(xiě)出, hdfs等文件系統(tǒng).
代碼:
演示
這是因?yàn)檫@個(gè)方法本質(zhì)上依賴大數(shù)據(jù)的Hadoop框架,需要配置Hadoop 依賴.
配置Hadoop依賴
調(diào)用保存文件的算子,需要配置Hadoop依賴。
- 下載Hadoop安裝包解壓到電腦任意位置
- 在Python代碼中使用os模塊配置: os.environ['HADOOP_HOME']='HADOOP解壓文件夾路徑′。
- 下載winutils.exe,并放入Hadoop解壓文件夾的bin目錄內(nèi)
- 下載hadoop.dll,并放入:C:/Windows/System32文件夾內(nèi)
配置完成之后,執(zhí)行下面的代碼
from pyspark import SparkConf,SparkContext import os os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0' conf=SparkConf().setMaster("local[*]").setAppName("test_spark") sc=SparkContext(conf=conf) #準(zhǔn)備rdd rdd1=sc.parallelize([1,2,3,4,5]) rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)]) rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]]) #輸出到文件中 rdd1.saveAsTextFile("D:/output1") rdd2.saveAsTextFile("D:/output2") rdd3.saveAsTextFile("D:/output3")
結(jié)果是
輸出的文件夾中有這么8文件,是因?yàn)镽DD被默認(rèn)為分成8個(gè)分區(qū)
SaveAsTextFile算子輸出文件的個(gè)數(shù)是根據(jù)RDD的分區(qū)來(lái)決定的,有多少分區(qū)就會(huì)輸出多少個(gè)文件,RDD在本電腦中默認(rèn)是8(該電腦CPU核心數(shù)是8核)
打開(kāi)設(shè)備管理器就可以查看處理器個(gè)數(shù),這里是有8個(gè)邏輯CPU
或者打開(kāi)任務(wù)管理器就可以看到是4核8個(gè)邏輯CPU
修改rdd分區(qū)為1個(gè)
方式1, SparkConf對(duì)象設(shè)置屬性全局并行度為1:
方式2,創(chuàng)建RDD的時(shí)候設(shè)置( parallelize方法傳入numSlices參數(shù)為1)
from pyspark import SparkConf,SparkContext import os os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0' conf=SparkConf().setMaster("local[*]").setAppName("test_spark") #rdd分區(qū)設(shè)置為1 conf.set("spark.default.parallelism","1") sc=SparkContext(conf=conf) #準(zhǔn)備rdd rdd1=sc.parallelize([1,2,3,4,5]) rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)]) rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]]) #輸出到文件中 rdd1.saveAsTextFile("D:/output1") rdd2.saveAsTextFile("D:/output2") rdd3.saveAsTextFile("D:/output3")
結(jié)果是
小結(jié)
1.RDD輸出到文件的方法
- rdd.saveAsTextFile(路徑)
- 輸出的結(jié)果是一個(gè)文件夾
- 有幾個(gè)分區(qū)就輸出多少個(gè)結(jié)果文件
2.如何修改RDD分區(qū)
- SparkConf對(duì)象設(shè)置conf.set("spark.default.parallelism", "7")
- 創(chuàng)建RDD的時(shí)候,sc.parallelize方法傳入numSlices參數(shù)為1
四.練習(xí)案例
需求:
讀取文件轉(zhuǎn)換成RDD,并完成:
- 打印輸出:熱門(mén)搜索時(shí)間段(小時(shí)精度)Top3
- 打印輸出:熱門(mén)搜索詞Top3
- 打印輸出:統(tǒng)計(jì)黑馬程序員關(guān)鍵字在哪個(gè)時(shí)段被搜索最多
- 將數(shù)據(jù)轉(zhuǎn)換為JSON格式,寫(xiě)出為文件
代碼
from pyspark import SparkConf,SparkContext import os os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0' conf=SparkConf().setMaster("local[*]").setAppName("test_spark") #rdd分區(qū)設(shè)置為1 conf.set("spark.default.parallelism","1") sc=SparkContext(conf=conf) rdd=sc.textFile("D:/search_log.txt") #需求1 打印輸出:熱門(mén)搜索時(shí)間段(小時(shí)精度)Top3 # 取出全部的時(shí)間并轉(zhuǎn)換為小時(shí) # 轉(zhuǎn)換為(小時(shí),1)的二元元組 # Key分組聚合Value # 排序(降序) # 取前3 result1=rdd.map(lambda x:x.split("\t")).\ map(lambda x:x[0][:2]).\ map(lambda x:(x,1)).\ reduceByKey(lambda x,y:x+y).\ sortBy(lambda x:x[1],ascending=False,numPartitions=1).\ take(3)#上面用的‘/'是換行的意思,當(dāng)一行代碼太長(zhǎng)時(shí)就可以這樣用 print(result1) #需求2 打印輸出:熱門(mén)搜索詞Top3 # 取出全部的搜索詞 # (詞,1)二元元組 # 分組聚合 # 排序 # Top3 result2=rdd.map(lambda x:x.split("\t")).\ map(lambda x:x[2])\ .map(lambda x:(x,1)).\ reduceByKey(lambda x,y:x+y).\ sortBy(lambda x:x[1],ascending=False,numPartitions=1).\ take(3) print(result2) #需求3 打印輸出:統(tǒng)計(jì)黑馬程序員關(guān)鍵字在哪個(gè)時(shí)段被搜索最多 result3=rdd.map(lambda x:x.split("\t")).\ filter((lambda x:x[2]=="黑馬程序員")).\ map(lambda x:(x[0][:2],1)).\ reduceByKey(lambda x,y:x+y).\ sortBy(lambda x:x[1],ascending=False,numPartitions=1).\ take(3) print(result3) #需求4 將數(shù)據(jù)轉(zhuǎn)換為JSON格式,寫(xiě)出為文件 rdd.map(lambda x:x.split("\t")).\ map(lambda x:{"time":x[0],"id":x[1],"key":x[2],"num1":x[3],"num2":x[4],"url":x[5]})\ .saveAsTextFile("D:/out_json")
結(jié)果是
到此這篇關(guān)于PySpark中RDD的數(shù)據(jù)輸出詳解的文章就介紹到這了,更多相關(guān)PySpark RDD數(shù)據(jù)輸出內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python實(shí)現(xiàn)簡(jiǎn)易信息分類存儲(chǔ)軟件
這篇文章主要介紹的是通過(guò)Python制作一個(gè)簡(jiǎn)易的文件分類存儲(chǔ)文件,可以實(shí)現(xiàn)信息的增刪改查以及內(nèi)容的導(dǎo)出和回復(fù),文中的示例代碼對(duì)我們的學(xué)習(xí)有一定的價(jià)值,感興趣的同學(xué)可以了解一下2021-12-12Python常用內(nèi)置函數(shù)和關(guān)鍵字使用詳解
在Python中有許許多多的內(nèi)置函數(shù)和關(guān)鍵字,它們是我們?nèi)粘V薪?jīng)??梢允褂玫牡降囊恍┗A(chǔ)的工具,可以方便我們的工作。本文將詳細(xì)講解他們的使用方法,需要的可以參考一下2022-05-05Python實(shí)現(xiàn)解析參數(shù)的三種方法詳解
這篇文章主要介紹了python解析參數(shù)的三種方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2022-07-07python將excel轉(zhuǎn)換為csv的代碼方法總結(jié)
在本篇文章里小編給大家分享了關(guān)于python如何將excel轉(zhuǎn)換為csv的實(shí)例方法和代碼內(nèi)容,需要的朋友們學(xué)習(xí)下。2019-07-07Python 將json序列化后的字符串轉(zhuǎn)換成字典(推薦)
這篇文章主要介紹了Python 將json序列化后的字符串轉(zhuǎn)換成字典,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-01-01Python可視化Tkinter進(jìn)階grid布局詳情
這篇文章主要介紹了Python可視化Tkinter進(jìn)階grid布局詳情,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-07-07