PySpark與GraphFrames的安裝與使用環(huán)境搭建過程
PySpark環(huán)境搭建
配置hadoop
spark訪問本地文件并執(zhí)行運算時,可能會遇到權(quán)限問題或是dll錯誤。這是因為spark需要使用到Hadoop的winutils和hadoop.dll,首先我們必須配置好Hadoop相關(guān)的環(huán)境。可以到github下載:https://github.com/4ttty/winutils
gitcode提供了鏡像加速:https://gitcode.net/mirrors/4ttty/winutils
我選擇了使用這個倉庫提供的最高的Hadoop版本3.0.0將其解壓到D:\deploy\hadoop-3.0.0目錄下,然后配置環(huán)境變量:
我們還需要將對應(yīng)的hadoop.dll復制到系統(tǒng)中,用命令表達就是:
copy D:\deploy\hadoop-3.0.0\bin\hadoop.dll C:\Windows\System32
不過這步需要擁有管理員權(quán)限才可以操作。
為了能夠在任何地方使用winutils命令工具,將%HADOOP_HOME%\bin
目錄加入環(huán)境變量中:
安裝pyspark與Java
首先,我們安裝spark當前(2022-2-17)的最新版本:
pip install pyspark==3.2.1
需要注意pyspark的版本決定了jdk的最高版本,例如假如安裝2.4.5版本的pyspark就只能安裝1.8版本的jdk,否則會報出java.lang.IllegalArgumentException: Unsupported class file major version 55
的錯誤。
這是因為pyspark內(nèi)置了Scala,而Scala是基于jvm的編程語言,Scala與jdk的版本存在兼容性問題,JDK與scala的版本兼容性表:
JDK version | Minimum Scala versions | Recommended Scala versions |
---|---|---|
17 | 2.13.6, 2.12.15 (forthcoming) | 2.13.6, 2.12.15 (forthcoming) |
16 | 2.13.5, 2.12.14 | 2.13.6, 2.12.14 |
13, 14, 15 | 2.13.2, 2.12.11 | 2.13.6, 2.12.14 |
12 | 2.13.1, 2.12.9 | 2.13.6, 2.12.14 |
11 | 2.13.0, 2.12.4, 2.11.12 | 2.13.6, 2.12.14, 2.11.12 |
8 | 2.13.0, 2.12.0, 2.11.0, 2.10.2 | 2.13.6, 2.12.14, 2.11.12, 2.10.7 |
6, 7 | 2.11.0, 2.10.0 | 2.11.12, 2.10.7 |
當前3.2.1版本的pyspark內(nèi)置的Scala版本為2.12.15,意味著jdk17與其以下的所有版本都支持。
這里我依然選擇安裝jdk8的版本:
測試一下:
>java -version java version "1.8.0_201" Java(TM) SE Runtime Environment (build 1.8.0_201-b09) Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)
jdk11的詳細安裝教程(jdk1.8在官網(wǎng)只有安裝包,無zip綠化壓縮包):
綠化版Java11的環(huán)境配置與Python調(diào)用Java
https://xxmdmst.blog.csdn.net/article/details/118366166
graphframes安裝
pip安裝當前最新的graphframes:
pip install graphframes==0.6
然后在官網(wǎng)下載graphframes的jar包。
下載地址:https://spark-packages.org/package/graphframes/graphframes
由于安裝的pyspark版本是3.2,所以這里我選擇了這個jar包:
然后將該jar包放入pyspark安裝目錄的jars目錄下:
pyspark安裝位置可以通過pip查看:
C:\Users\ASUS>pip show pyspark Name: pyspark Version: 3.2.1 Summary: Apache Spark Python API Home-page: https://github.com/apache/spark/tree/master/python Author: Spark Developers Author-email: dev@spark.apache.org License: http://www.apache.org/licenses/LICENSE-2.0 Location: d:\miniconda3\lib\site-packages Requires: py4j Required-by:
使用方法
學習pyspark的最佳路徑是官網(wǎng):https://spark.apache.org/docs/latest/quick-start.html
在下面的網(wǎng)頁,官方提供了在線jupyter:
https://spark.apache.org/docs/latest/api/python/getting_started/index.html
啟動spark并讀取數(shù)據(jù)
本地模式啟動spark:
from pyspark.sql import SparkSession, Row spark = SparkSession \ .builder \ .appName("Python Spark") \ .master("local[*]") \ .getOrCreate() sc = spark.sparkContext spark
SparkSession輸出的內(nèi)容中包含了spark的web頁面,新標簽頁打開頁面后大致效果如上。
點擊Environment選項卡可以查看當前環(huán)境中的變量:
啟動hive支持
找到pyspark的安裝位置,例如我的電腦在D:\Miniconda3\Lib\site-packages\pyspark
手動創(chuàng)建conf目錄并將hive-site.xml配置文件復制到其中。如果hive使用了MySQL作為原數(shù)據(jù)庫,則還需要將MySQL對應(yīng)的驅(qū)動jar包放入spark的jars目錄下。
創(chuàng)建spark會話對象時可通過enableHiveSupport()
開啟hive支持:
from pyspark.sql import SparkSession from pyspark.sql import Row spark = SparkSession \ .builder \ .appName("Python Spark SQL Hive integration example") \ .enableHiveSupport() \ .getOrCreate() sc = spark.sparkContext spark
spark訪問hive自己創(chuàng)建的表有可能會出現(xiàn)如下的權(quán)限報錯:
Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS s
hould be writable. Current permissions are: rwx------
是因為當前用戶不具備對\tmp\hive的操作權(quán)限:
>winutils ls \tmp\hive drwx------ 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May 4 2020 \tmp\hive
把\tmp\hive目錄的權(quán)限改為777即可順利訪問:
>winutils chmod 777 \tmp\hive >winutils ls \tmp\hive drwxrwxrwx 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May 4 2020 \tmp\hive
Spark的DataFrame與RDD
從spark2.x開始將RDD和DataFrame的API統(tǒng)一抽象成dataset,DataFrame就是Dataset[Row],RDD則是Dataset.rdd。可以將DataFrame理解為包含結(jié)構(gòu)化信息的RDD。
將含row的RDD轉(zhuǎn)換為DataFrame只需要調(diào)用toDF方法或SparkSession的createDataFrame方法即可,也可以傳入schema覆蓋類型或名稱設(shè)置。
DataFrame的基礎(chǔ)api
DataFrame默認支持DSL風格語法,例如:
//查看DataFrame中的內(nèi)容 df.show() //查看DataFrame部分列中的內(nèi)容 df.select(df['name'], df['age'] + 1).show() df.select("name").show() //打印DataFrame的Schema信息 df.printSchema() //過濾age大于等于 21 的 df.filter(df['age'] > 21).show() //按年齡進行分組并統(tǒng)計相同年齡的人數(shù) personDF.groupBy("age").count().show()
將DataFrame注冊成表或視圖之后即可進行純SQL操作:
df.createOrReplaceTempView("people") //df.createTempView("t_person") //查詢年齡最大的前兩名 spark.sql("select * from t_person order by age desc limit 2").show() //顯示表的Schema信息 spark.sql("desc t_person").show()
Pyspark可以直接很方便的注冊udf并直接使用:
strlen = spark.udf.register("len", lambda x: len(x)) print(spark.sql("SELECT len('test') length").collect()) print(spark.sql("SELECT 'foo' AS text").select(strlen("text").alias('length')).collect())
執(zhí)行結(jié)果:
[Row(length='4')]
[Row(length='3')]
RDD的簡介
DataFrame的本質(zhì)是對RDD的包裝,可以理解為DataFrame=RDD[Row]+schema。
RDD(A Resilient Distributed Dataset)叫做彈性可伸縮分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象。它代表一個不可變、自動容錯、可伸縮性、可分區(qū)、里面的元素可并行計算的集合。
在每一個RDD內(nèi)部具有五大屬性:
- 具有一系列的分區(qū)
- 一個計算函數(shù)操作于每一個切片
- 具有一個對其他RDD的依賴列表
- 對于 key-value RDDs具有一個Partitioner分區(qū)器
- 存儲每一個切片最佳計算位置
一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對于RDD來說,每個分片都會被一個計算任務(wù)處理,并決定并行計算的粒度。用戶可以在創(chuàng)建RDD時指定RDD的分片個數(shù),如果沒有指定,那么就會采用默認值。默認值就是程序所分配到的CPU Core的數(shù)目。
**一個計算每個分區(qū)的函數(shù)。**Spark中RDD的計算是以分片為單位的,每個RDD都會實現(xiàn)compute函數(shù)以達到這個目的。compute函數(shù)會對迭代器進行復合,不需要保存每次計算的結(jié)果。
**RDD之間的依賴關(guān)系。**RDD的每次轉(zhuǎn)換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時,Spark可以通過這個依賴關(guān)系重新計算丟失的分區(qū)數(shù)據(jù),而不是對RDD的所有分區(qū)進行重新計算。
**一個Partitioner,即RDD的分片函數(shù)。**當前Spark中實現(xiàn)了兩種類型的分片函數(shù),一個是基于哈希的HashPartitioner,另外一個是基于范圍的RangePartitioner。只有對于于key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時的分片數(shù)量。
**一個列表,存儲存取每個Partition的優(yōu)先位置(preferred location)。**對于一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數(shù)據(jù)不如移動計算”的理念,Spark在進行任務(wù)調(diào)度的時候,會盡可能地將計算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲位置。
RDD的API概覽
RDD包含Transformation API和 Action API,Transformation API都是延遲加載的只是記住這些應(yīng)用到基礎(chǔ)數(shù)據(jù)集上的轉(zhuǎn)換動作,只有當執(zhí)行Action API時這些轉(zhuǎn)換才會真正運行。
Transformation API產(chǎn)生的兩類RDD最重要,分別是MapPartitionsRDD和ShuffledRDD。
產(chǎn)生MapPartitionsRDD的算子有map、keyBy、keys、values、flatMap、mapValues 、flatMapValues、mapPartitions、mapPartitionsWithIndex、glom、filter和filterByRange 。其中用的最多的是map和flatMap,但任何產(chǎn)生MapPartitionsRDD的算子都可以直接使用mapPartitions或mapPartitionsWithIndex實現(xiàn)。
產(chǎn)生ShuffledRDD的算子有combineByKeyWithClassTag、combineByKey、aggregateByKey、foldByKey 、reduceByKey 、distinct、groupByKey、groupBy、partitionBy、sortByKey 和 repartitionAndSortWithinPartitions。
combineByKey到groupByKey 底層均是調(diào)用combineByKeyWithClassTag方法:
@Experimental def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { combineByKeyWithClassTag(createCombiner,mergeValue,mergeCombiners ,defaultPartitioner(self)) } def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope { combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null) } def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }
三個重要參數(shù)的含義:
- createCombiner:根據(jù)每個分區(qū)的第一個元素操作產(chǎn)生一個初始值
- mergeValue:對每個分區(qū)內(nèi)部的元素進行迭代合并
- mergeCombiners:對所有分區(qū)的合并結(jié)果進行合并
groupByKey的partitioner未指定時會傳入默認的defaultPartitioner。例如:
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2).keyBy(_.length) a.groupByKey.collect res9: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(lion)), (6,CompactBuffer(spider)), (3,CompactBuffer(dog, cat)), (5,CompactBuffer(tiger, eagle)))
aggregateByKey:每個分區(qū)使用zeroValue作為初始值,迭代每一個元素用seqOp進行合并,對所有分區(qū)的結(jié)果用combOp進行合并。例如:
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2) pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect res6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6)) pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect res7: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
reduceByKey :每個分區(qū)迭代每一個元素用func進行合并,對所有分區(qū)的結(jié)果用func再進行合并,例如:
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.reduceByKey(_ + _).collect
Action API有:
動作 | 含義 |
---|---|
reduce(func) | 通過func函數(shù)聚集RDD中的所有元素,這個功能必須是課交換且可并聯(lián)的 |
collect() | 在驅(qū)動程序中,以數(shù)組的形式返回數(shù)據(jù)集的所有元素 |
count() | 返回RDD的元素個數(shù) |
first() | 返回RDD的第一個元素(類似于take(1)) |
take(n) | 返回一個由數(shù)據(jù)集的前n個元素組成的數(shù)組 |
takeSample(withReplacement*,*num, [seed]) | 返回一個數(shù)組,該數(shù)組由從數(shù)據(jù)集中隨機采樣的num個元素組成,可以選擇是否用隨機數(shù)替換不足的部分,seed用于指定隨機數(shù)生成器種子 |
takeOrdered(n, [ordering]) | 排序并取前N個元素 |
saveAsTextFile(path) | 將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng),對于每個元素,Spark將會調(diào)用toString方法,將它裝換為文件中的文本 |
saveAsSequenceFile(path) | 將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)。 |
saveAsObjectFile(path) | 將RDD中的元素用NullWritable作為key,實際元素作為value保存為sequencefile格式 |
countByKey() | 針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應(yīng)的元素個數(shù)。 |
foreach(func) | 在數(shù)據(jù)集的每一個元素上,運行函數(shù)func進行更新。 |
spark模擬實現(xiàn)mapreduce版wordcount:
object MapreduceWordcount { def main(args: Array[String]): Unit = { import org.apache.spark._ val sc: SparkContext = new SparkContext(new SparkConf().setAppName("wordcount").setMaster("local[*]")) sc.setLogLevel("WARN") import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapred.TextInputFormat import org.apache.spark.rdd.HadoopRDD import scala.collection.mutable.ArrayBuffer def map(k: LongWritable, v: Text, collect: ArrayBuffer[(String, Int)]) = { for (word <- v.toString.split("\\s+")) collect += ((word, 1)) } def reduce(key: String, value: Iterator[Int], collect: ArrayBuffer[(String, Int)]) = { collect += ((key, value.sum)) } val rdd = sc.hadoopFile("/hdfs/wordcount/in1/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 2) .asInstanceOf[HadoopRDD[LongWritable, Text]] .mapPartitionsWithInputSplit((split, it) =>{ val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]() it.foreach(kv => map(kv._1, kv._2, collect)) collect.toIterator }) .repartitionAndSortWithinPartitions(new HashPartitioner(2)) .mapPartitions(it => { val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]() var lastKey: String = "" var values: ArrayBuffer[Int] = ArrayBuffer[Int]() for ((currKey, value) <- it) { if (!currKey.equals(lastKey)) { if (values.length != 0) reduce(lastKey, values.toIterator, collect) values.clear() } values += value lastKey = currKey } if (values.length != 0) reduce(lastKey, values.toIterator, collect) collect.toIterator }) rdd.foreach(println) } }
各類RDD
- ShuffledRDD :表示需要走Shuffle過程的網(wǎng)絡(luò)傳輸
- CoalescedRDD :用于將一臺機器的多個分區(qū)合并成一個分區(qū)
- CartesianRDD :對兩個RDD的所有元素產(chǎn)生笛卡爾積
- MapPartitionsRDD :用于對每個分區(qū)的數(shù)據(jù)進行特定的處理
- CoGroupedRDD :用于將2~4個rdd,按照key進行連接聚合
- SubtractedRDD :用于對2個RDD求差集
- UnionRDD和PartitionerAwareUnionRDD :用于對2個RDD求并集
- ZippedPartitionsRDD2:zip拉鏈操作產(chǎn)生的RDD
- ZippedWithIndexRDD:給每一個元素標記一個自增編號
- PartitionwiseSampledRDD:用于對rdd的元素按照指定的百分比進行隨機采樣
當我們需要給Datafream添加自增列時,可以使用zipWithUniqueId方法:
from pyspark.sql.types import StructType, LongType schema = data.schema.add(StructField("id", LongType())) rowRDD = data.rdd.zipWithUniqueId().map(lambda t: t[0]+Row(t[1])) data = rowRDD.toDF(schema) data.show()
API用法詳情可參考:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD
cache&checkpoint
RDD通過persist方法或cache方法可以將前面的計算結(jié)果緩存,但是并不是這兩個方法被調(diào)用時立即緩存,而是觸發(fā)后面的action時,該RDD將會被緩存在計算節(jié)點的內(nèi)存中,并供后面重用。
rdd.persist()
checkpoint的源碼注釋可以看到:
- 標記該RDD作為檢查點。
- 它將被保存在通過SparkContext#setCheckpointDir方法設(shè)置的檢查點目錄中
- 它所引用的所有父RDD引用將全部被移除
- 這個方法在這個RDD上必須在所有job執(zhí)行前運行。
- 強烈建議將這個RDD緩存在內(nèi)存中,否則這個保存文件的計算任務(wù)將重新計算。
從中我們得知,在執(zhí)行checkpoint方法時,最好同時,將該RDD緩存起來,否則,checkpoint也會產(chǎn)生一個計算任務(wù)。
sc.setCheckpointDir("checkpoint") rdd.cache() rdd.checkpoint()
graphframes 的用法
GraphFrame是將Spark中的Graph算法統(tǒng)一到DataFrame接口的Graph操作接口,為Scala、Java和Python提供了統(tǒng)一的圖處理API。
Graphframes是開源項目,源碼工程如下:https://github.com/graphframes/graphframes
可以參考:
- 官網(wǎng):https://graphframes.github.io/graphframes/docs/_site/index.html
- GraphFrames用戶指南-Python — Databricks文檔:https://docs.databricks.com/spark/latest/graph-analysis/graphframes/user-guide-python.html
在GraphFrames中圖的頂點(Vertex)和邊(edge)都是以DataFrame形式存儲的:
- 頂點DataFrame:必須包含列名為“id”的列,用于作為頂點的唯一標識
- 邊DataFrame:必須包含列名為“src”和“dst”的列,根據(jù)唯一標識id標識關(guān)系
創(chuàng)建圖的示例:
from graphframes import GraphFrame vertices = spark.createDataFrame([ ("a", "Alice", 34), ("b", "Bob", 36), ("c", "Charlie", 30), ("d", "David", 29), ("e", "Esther", 32), ("f", "Fanny", 36), ("g", "Gabby", 60)], ["id", "name", "age"]) edges = spark.createDataFrame([ ("a", "b", "friend"), ("b", "c", "follow"), ("c", "b", "follow"), ("f", "c", "follow"), ("e", "f", "follow"), ("e", "d", "friend"), ("d", "a", "friend"), ("a", "e", "friend") ], ["src", "dst", "relationship"]) # 生成圖 g = GraphFrame(vertices, edges)
GraphFrame提供三種視圖:
print("頂點表視圖:") graph.vertices.show() # graph.vertices 就是原始的vertices print("邊表視圖:") graph.edges.show() # graph.edges 就是原始的 edges print("三元組視圖:") graph.triplets.show()
獲取頂點的度、入度和出度:
# 頂點的度 graph.degrees.show() # 頂點的入度 graph.inDegrees.show() # 頂點的出度 graph.outDegrees.show()
Motif finding (模式發(fā)現(xiàn))
示例:
# 多個路徑條件 motif = graph.find("(a)-[e]->(b); (b)-[e2]->(a)") # 在搜索的結(jié)果上進行過濾 motif.filter("b.age > 30") # 不需要返回路徑中的元素時,可以使用匿名頂點和邊 motif = graph.find("(start)-[]->()") # 設(shè)置路徑不存在的條件 motif = graph.find("(a)-[]->(b); !(b)-[]->(a)")
假設(shè)我們要想給用戶推薦關(guān)注的人,可以找出這樣的關(guān)系:A關(guān)注B,B關(guān)注C,但是A未關(guān)注C。找出這樣的關(guān)系就可以把C推薦給A:
# Motif: A->B->C but not A->C results = graph.find("(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)") # 排除自己 results = results.filter("A.id != C.id") # 選擇需要的列 results = results.select(results.A.id.alias("A"), results.C.id.alias("C")) results.show()
結(jié)果:
+---+---+
| A| C|
+---+---+
| e| c|
| e| a|
| d| b|
| a| d|
| f| b|
| d| e|
| a| f|
| a| c|
+---+---+
Motif在查找路徑過程的過程中,還可以沿著路徑攜帶狀態(tài)。例如我們想要找出關(guān)系鏈有4個頂點,而且其中3條邊全部都是"friend"關(guān)系:
from pyspark.sql.functions import col, lit, when from functools import reduce chain4 = graph.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)") def sumFriends(cnt, relationship): "定義下一個頂點更新狀態(tài)的條件:如果關(guān)系為friend則cnt+1" return when(relationship == "friend", cnt+1).otherwise(cnt) # 將更新方法應(yīng)用到整個鏈的,鏈上每有一個關(guān)系是 friend 就加一,鏈上共三個關(guān)系。 condition = reduce(lambda cnt, e: sumFriends( cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0)) chainWith2Friends2 = chain4.where(condition >= 3) chainWith2Friends2.show()
結(jié)果:
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
| a| ab| b| bc| c| cd| d|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, b, friend}| {b, Bob, 36}|
| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
Subgraphs 子圖
可以直接過濾其頂點或邊,dropIsolatedVertices()
方法用于刪除孤立沒有連接的點:
graph.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()
還可以基于模式發(fā)現(xiàn)獲取到的邊創(chuàng)建Subgraphs :
paths = graph.find("(a)-[e]->(b)")\ .filter("e.relationship = 'follow'")\ .filter("a.age < b.age") # 抽取邊信息e2 = paths.select("e.src", "e.dst", "e.relationship") e2 = paths.select("e.*") # 創(chuàng)建Subgraphs g2 = GraphFrame(graph.vertices, e2)
GraphFrames支持的GraphX算法
- PageRank:查找圖中的重要頂點。
- 廣度優(yōu)先搜索(BFS):查找從一組頂點到另一組頂點的最短路徑
- 連通組件(ConnectedComponents):為具備連接關(guān)系的頂點分配相同的組件ID
- 強連通組件(StronglyConnectedConponents):根據(jù)每個頂點的強連通分量分配SCC。
- 最短路徑(Shortest paths):查找從每個頂點到目標頂點集的最短路徑。
- 三角形計數(shù)(TriangleCount):計算每個頂點所屬的三角形的數(shù)量,經(jīng)常用于確定組的穩(wěn)定性(相互連接的數(shù)量代表了穩(wěn)定性)或作為其他網(wǎng)絡(luò)度量(如聚類系數(shù))的一部分,在社交網(wǎng)絡(luò)分析中用來檢測社區(qū)。
- 標簽傳播算法(LPA):檢測圖中的社區(qū)。
pageRank算法:
results = graph.pageRank(resetProbability=0.15, maxIter=10) results.vertices.sort("pagerank", ascending=False).show()
結(jié)果:
+---+-------+---+-------------------+
| id| name|age| pagerank|
+---+-------+---+-------------------+
| b| Bob| 36| 2.7025217677349773|
| c|Charlie| 30| 2.6667877057849627|
| a| Alice| 34| 0.4485115093698443|
| e| Esther| 32| 0.3613490987992571|
| f| Fanny| 36|0.32504910549694244|
| d| David| 29|0.32504910549694244|
| g| Gabby| 60|0.17073170731707318|
+---+-------+---+-------------------+
可以設(shè)置起始頂點:
graph.pageRank(resetProbability=0.15, maxIter=10, sourceId="a") graph.parallelPersonalizedPageRank(resetProbability=0.15, sourceIds=["a", "b", "c", "d"], maxIter=10)
廣度優(yōu)先搜索BFS:
搜索從姓名叫Esther到年齡小于32的最小路徑:
paths = graph.bfs("name = 'Esther'", "age < 32") paths.show()
+--------------+--------------+---------------+ | from| e0| to| +--------------+--------------+---------------+ |{a, Alice, 34}|{a, e, friend}|{e, Esther, 32}| +--------------+--------------+---------------+
可以指定只能在指定的邊搜索:
graph.bfs("name = 'Esther'", "age < 32", edgeFilter="relationship != 'friend'", maxPathLength=4 ).show()
+---------------+--------------+--------------+--------------+----------------+ | from| e0| v1| e1| to| +---------------+--------------+--------------+--------------+----------------+ |{e, Esther, 32}|{e, f, follow}|{f, Fanny, 36}|{f, c, follow}|{c, Charlie, 30}| +---------------+--------------+--------------+--------------+----------------+
Connected components 連通組件:
必須先設(shè)置檢查點:
sc.setCheckpointDir("checkpoint") graph.connectedComponents().show()
結(jié)果:
+---+-------+---+------------+
| id| name|age| component|
+---+-------+---+------------+
| a| Alice| 34|412316860416|
| b| Bob| 36|412316860416|
| c|Charlie| 30|412316860416|
| d| David| 29|412316860416|
| e| Esther| 32|412316860416|
| f| Fanny| 36|412316860416|
| g| Gabby| 60|146028888064|
+---+-------+---+------------+
可以看到僅g點在一個連通區(qū)域內(nèi),可以調(diào)用dropIsolatedVertices()
方法,刪除這種孤立的沒有連接的點:
graph.dropIsolatedVertices().connectedComponents().show()
結(jié)果:
+---+-------+---+------------+
| id| name|age| component|
+---+-------+---+------------+
| a| Alice| 34|412316860416|
| b| Bob| 36|412316860416|
| c|Charlie| 30|412316860416|
| d| David| 29|412316860416|
| e| Esther| 32|412316860416|
| f| Fanny| 36|412316860416|
+---+-------+---+------------+
Strongly connected components 強連通組件:
graph.stronglyConnectedComponents(maxIter=10).show()
Shortest paths 最短路徑:
每個頂點到a或d的最短路徑:
graph.shortestPaths(landmarks=["a", "d"]).show()
+---+-------+---+----------------+ | id| name|age| distances| +---+-------+---+----------------+ | g| Gabby| 60| {}| | f| Fanny| 36| {}| | e| Esther| 32|{a -> 2, d -> 1}| | d| David| 29|{a -> 1, d -> 0}| | c|Charlie| 30| {}| | b| Bob| 36| {}| | a| Alice| 34|{a -> 0, d -> 2}| +---+-------+---+----------------+
Triangle count 三角形計數(shù):
graph.triangleCount().show()
+-----+---+-------+---+ |count| id| name|age| +-----+---+-------+---+ | 1| a| Alice| 34| | 0| b| Bob| 36| | 0| c|Charlie| 30| | 1| d| David| 29| | 1| e| Esther| 32| | 0| g| Gabby| 60| | 0| f| Fanny| 36| +-----+---+-------+---+
說明頂點a/e/d構(gòu)成三角形。
標簽傳播算法(LPA):
graph.labelPropagation(maxIter=5).orderBy("label").show()
+---+-------+---+-------------+ | id| name|age| label| +---+-------+---+-------------+ | g| Gabby| 60| 146028888064| | f| Fanny| 36|1047972020224| | b| Bob| 36|1047972020224| | a| Alice| 34|1382979469312| | c|Charlie| 30|1382979469312| | e| Esther| 32|1460288880640| | d| David| 29|1460288880640| +---+-------+---+-------------+
PySpark3.X與pandas融合
Pyspark從3.0版本開始出現(xiàn)了pandas_udf裝飾器、applyInPandas和mapInPandas,基于這些方法,我們就可以使用熟悉的pandas的語法處理spark對象的數(shù)據(jù)。
首先創(chuàng)建幾條測試數(shù)據(jù),并啟動 Apache Arrow:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) df.show()
自定義UDF和UDAF
pyspark暫不支持自定義UDTF。
使用pandas_udf裝飾器我們可以創(chuàng)建出基于pandas的udf自定義函數(shù),在DSL的語法中可以被直接使用:
from pyspark.sql.functions import pandas_udf import pandas as pd @pandas_udf("double") def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series: return a * b df.select(multiply_func("id", "v").alias("product")).show()
注冊函數(shù)和視圖后,可以直接在SQL中使用:
df.createOrReplaceTempView("t") spark.udf.register("multiply", multiply_func) spark.sql('select multiply(id, v) product from t').show()
結(jié)果均為:
+-------+
|product|
+-------+
| 1.0|
| 2.0|
| 6.0|
| 10.0|
| 20.0|
+-------+
還支持聚合函數(shù)和窗口函數(shù):
from pyspark.sql import Window @pandas_udf("double") def mean_udf(v: pd.Series) -> float: return v.mean() # 對字段'v'進行求均值 df.select(mean_udf('v').alias("mean_v")).show() # 按照‘id'分組,求'v'的均值 df.groupby("id").agg(mean_udf('v').alias("mean_v")).show() # 按照‘id'分組,求'v'的均值,并賦值給新的一列 df.withColumn('mean_v', mean_udf("v").over(Window.partitionBy('id'))).show()
注冊到udf之后同樣可以直接使用SQL實現(xiàn):
spark.udf.register("mean2", mean_udf) spark.sql('select mean2(v) mean_v from t').show() spark.sql('select id,mean2(v) mean_v from t group by id').show() spark.sql('select id,v,mean2(v) over(partition by id) mean_v from t').show()
結(jié)果均為:
+--------+
| mean_v |
+--------+
| 4.2|
+--------++---+--------+
| id| mean_v |
+---+--------+
| 1| 1.5|
| 2| 6.0|
+---+--------++---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.5|
| 1| 2.0| 1.5|
| 2| 3.0| 6.0|
| 2| 5.0| 6.0|
| 2|10.0| 6.0|
+---+----+------+
分組聚合與JOIN
applyInPandas需要在datafream調(diào)用groupby之后才能使用:
def subtract_mean(pdf): v = pdf.v pdf['v1'] = v - v.mean() pdf['v2'] = v + v.mean() return pdf t = df.groupby("id") t.applyInPandas( subtract_mean, schema="id long, v double, v1 double, v2 double").show()
結(jié)果:
+---+----+----+----+
| id| v| v1| v2|
+---+----+----+----+
| 1| 1.0|-0.5| 2.5|
| 1| 2.0| 0.5| 3.5|
| 2| 3.0|-3.0| 9.0|
| 2| 5.0|-1.0|11.0|
| 2|10.0| 4.0|16.0|
+---+----+----+----+
subtract_mean函數(shù)接收的是對應(yīng)id的dataframe數(shù)據(jù),schema指定了返回值的名稱和類型列表。
通過以下代碼我們可以知道,applyInPandas可以借助cogroup進行表連接:
val a = sc.parallelize(List(1, 2, 1, 3)) val b = a.map((_, "b")) val c = a.map((_, "c")) val d = a.map((_, "d")) val e = a.map((_, "e")) scala> b.cogroup(c).foreach(println) (3,(CompactBuffer(b),CompactBuffer(c))) (1,(CompactBuffer(b, b),CompactBuffer(c, c))) (2,(CompactBuffer(b),CompactBuffer(c)))
示例:
df1 = spark.createDataFrame( [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], ("time", "id", "v1")) df2 = spark.createDataFrame( [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")) def asof_join(l, r): # l、r is a pandas.DataFrame # 這里是按照id分組 # 那么,l和r分別是對應(yīng)id的df1和df2數(shù)據(jù) return pd.merge_asof(l, r, on="time", by="id") df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas( asof_join, schema="time int, id int, v1 double, v2 string").show() # +--------+---+---+---+ # | time| id| v1| v2| # +--------+---+---+---+ # |20000101| 1|1.0| x| # |20000102| 1|3.0| x| # |20000101| 2|2.0| y| # |20000102| 2|4.0| y| # +--------+---+---+---+
map迭代
執(zhí)行以下代碼:
def filter_func(iterator): for i, pdf in enumerate(iterator): print(i, pdf.values.tolist()) yield pdf df.mapInPandas(filter_func, schema=df.schema).show()
后臺看到執(zhí)行結(jié)果為:
0 [[2.0, 5.0]]
0 [[2.0, 3.0]]
0 [[1.0, 1.0]]
0 [[1.0, 2.0]]
0 [[2.0, 10.0]]
前臺結(jié)果幾乎保持原樣。可以知道iterator是一個分區(qū)迭代器,迭代出當前分區(qū)的每一行數(shù)據(jù)都被封裝成一個pandas對象。
Pyspark與Pandas的交互
將spark的Datafream對象轉(zhuǎn)換為原生的pandas對象只需調(diào)用toPandas()方法即可:
sdf.toPandas()
將原生的pandas對象轉(zhuǎn)換為spark對象可以使用spark的頂級方法:
spark.createDataFrame(pdf)
習慣使用pandas的童鞋,還可以直接使用pandas-on-Spark,在spark3.2.0版本時已經(jīng)匹配到pandas 1.3版本的API。通過pandas-on-Spark,我們可以完全用pandas的api操作數(shù)據(jù),而底層執(zhí)行卻是spark的并行化。
使用pandas-on-Spark最好設(shè)置一下環(huán)境變量:
import os os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
將spark對象轉(zhuǎn)換為pandas-on-Spark對象:
df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) pdf = df.to_pandas_on_spark() print(type(pdf)) pdf
pandas-on-Spark對象也可以還原成spark對象:
pdf.to_spark()
另外spark提供直接將文件讀取成pandas-on-Spark對象的api,例如:
import pyspark.pandas as ps pdf = ps.read_csv("example_csv.csv")
ps對象與原生pandas對象的API幾乎完全一致。
ps對象相對于原生pandas對象的API幾乎一致,同時還支持一些強悍的功能,例如直接以SQL形式訪問:
ps.sql("SELECT count(*) as num FROM {pdf}")
{pdf}訪問了變量名為pdf的pandas-on-Spark對象。
到此這篇關(guān)于PySpark與GraphFrames的安裝與使用的文章就介紹到這了,更多相關(guān)PySpark與GraphFrames使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python讀取mnist數(shù)據(jù)集方法案例詳解
這篇文章主要介紹了python讀取mnist數(shù)據(jù)集方法案例詳解,本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細內(nèi)容,需要的朋友可以參考下2021-09-09python使用PIL把透明背景圖片轉(zhuǎn)成白色背景的示例代碼
當我們在采集一些圖片的時候,這些圖片的背景經(jīng)常是透明的,但是如何把透明背景轉(zhuǎn)成白色背景呢,接下來就給大家解決這個問題,本文主要介紹了python使用PIL把透明背景圖片轉(zhuǎn)成白色背景,需要的朋友可以參考下2023-08-08