PySpark與GraphFrames的安裝與使用環(huán)境搭建過程
PySpark環(huán)境搭建
配置hadoop
spark訪問本地文件并執(zhí)行運(yùn)算時(shí),可能會(huì)遇到權(quán)限問題或是dll錯(cuò)誤。這是因?yàn)閟park需要使用到Hadoop的winutils和hadoop.dll,首先我們必須配置好Hadoop相關(guān)的環(huán)境??梢缘絞ithub下載:https://github.com/4ttty/winutils
gitcode提供了鏡像加速:https://gitcode.net/mirrors/4ttty/winutils
我選擇了使用這個(gè)倉庫提供的最高的Hadoop版本3.0.0將其解壓到D:\deploy\hadoop-3.0.0目錄下,然后配置環(huán)境變量:

我們還需要將對應(yīng)的hadoop.dll復(fù)制到系統(tǒng)中,用命令表達(dá)就是:
copy D:\deploy\hadoop-3.0.0\bin\hadoop.dll C:\Windows\System32
不過這步需要擁有管理員權(quán)限才可以操作。
為了能夠在任何地方使用winutils命令工具,將%HADOOP_HOME%\bin目錄加入環(huán)境變量中:

安裝pyspark與Java
首先,我們安裝spark當(dāng)前(2022-2-17)的最新版本:
pip install pyspark==3.2.1
需要注意pyspark的版本決定了jdk的最高版本,例如假如安裝2.4.5版本的pyspark就只能安裝1.8版本的jdk,否則會(huì)報(bào)出java.lang.IllegalArgumentException: Unsupported class file major version 55的錯(cuò)誤。
這是因?yàn)閜yspark內(nèi)置了Scala,而Scala是基于jvm的編程語言,Scala與jdk的版本存在兼容性問題,JDK與scala的版本兼容性表:
| JDK version | Minimum Scala versions | Recommended Scala versions |
|---|---|---|
| 17 | 2.13.6, 2.12.15 (forthcoming) | 2.13.6, 2.12.15 (forthcoming) |
| 16 | 2.13.5, 2.12.14 | 2.13.6, 2.12.14 |
| 13, 14, 15 | 2.13.2, 2.12.11 | 2.13.6, 2.12.14 |
| 12 | 2.13.1, 2.12.9 | 2.13.6, 2.12.14 |
| 11 | 2.13.0, 2.12.4, 2.11.12 | 2.13.6, 2.12.14, 2.11.12 |
| 8 | 2.13.0, 2.12.0, 2.11.0, 2.10.2 | 2.13.6, 2.12.14, 2.11.12, 2.10.7 |
| 6, 7 | 2.11.0, 2.10.0 | 2.11.12, 2.10.7 |
當(dāng)前3.2.1版本的pyspark內(nèi)置的Scala版本為2.12.15,意味著jdk17與其以下的所有版本都支持。
這里我依然選擇安裝jdk8的版本:

測試一下:
>java -version java version "1.8.0_201" Java(TM) SE Runtime Environment (build 1.8.0_201-b09) Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)
jdk11的詳細(xì)安裝教程(jdk1.8在官網(wǎng)只有安裝包,無zip綠化壓縮包):
綠化版Java11的環(huán)境配置與Python調(diào)用Java
https://xxmdmst.blog.csdn.net/article/details/118366166
graphframes安裝
pip安裝當(dāng)前最新的graphframes:
pip install graphframes==0.6
然后在官網(wǎng)下載graphframes的jar包。
下載地址:https://spark-packages.org/package/graphframes/graphframes
由于安裝的pyspark版本是3.2,所以這里我選擇了這個(gè)jar包:

然后將該jar包放入pyspark安裝目錄的jars目錄下:

pyspark安裝位置可以通過pip查看:
C:\Users\ASUS>pip show pyspark Name: pyspark Version: 3.2.1 Summary: Apache Spark Python API Home-page: https://github.com/apache/spark/tree/master/python Author: Spark Developers Author-email: dev@spark.apache.org License: http://www.apache.org/licenses/LICENSE-2.0 Location: d:\miniconda3\lib\site-packages Requires: py4j Required-by:
使用方法
學(xué)習(xí)pyspark的最佳路徑是官網(wǎng):https://spark.apache.org/docs/latest/quick-start.html
在下面的網(wǎng)頁,官方提供了在線jupyter:
https://spark.apache.org/docs/latest/api/python/getting_started/index.html

啟動(dòng)spark并讀取數(shù)據(jù)
本地模式啟動(dòng)spark:
from pyspark.sql import SparkSession, Row
spark = SparkSession \
.builder \
.appName("Python Spark") \
.master("local[*]") \
.getOrCreate()
sc = spark.sparkContext
spark
SparkSession輸出的內(nèi)容中包含了spark的web頁面,新標(biāo)簽頁打開頁面后大致效果如上。
點(diǎn)擊Environment選項(xiàng)卡可以查看當(dāng)前環(huán)境中的變量:

啟動(dòng)hive支持
找到pyspark的安裝位置,例如我的電腦在D:\Miniconda3\Lib\site-packages\pyspark
手動(dòng)創(chuàng)建conf目錄并將hive-site.xml配置文件復(fù)制到其中。如果hive使用了MySQL作為原數(shù)據(jù)庫,則還需要將MySQL對應(yīng)的驅(qū)動(dòng)jar包放入spark的jars目錄下。
創(chuàng)建spark會(huì)話對象時(shí)可通過enableHiveSupport()開啟hive支持:
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
sparkspark訪問hive自己創(chuàng)建的表有可能會(huì)出現(xiàn)如下的權(quán)限報(bào)錯(cuò):
Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS s
hould be writable. Current permissions are: rwx------
是因?yàn)楫?dāng)前用戶不具備對\tmp\hive的操作權(quán)限:
>winutils ls \tmp\hive drwx------ 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May 4 2020 \tmp\hive
把\tmp\hive目錄的權(quán)限改為777即可順利訪問:
>winutils chmod 777 \tmp\hive >winutils ls \tmp\hive drwxrwxrwx 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May 4 2020 \tmp\hive
Spark的DataFrame與RDD
從spark2.x開始將RDD和DataFrame的API統(tǒng)一抽象成dataset,DataFrame就是Dataset[Row],RDD則是Dataset.rdd。可以將DataFrame理解為包含結(jié)構(gòu)化信息的RDD。
將含row的RDD轉(zhuǎn)換為DataFrame只需要調(diào)用toDF方法或SparkSession的createDataFrame方法即可,也可以傳入schema覆蓋類型或名稱設(shè)置。
DataFrame的基礎(chǔ)api
DataFrame默認(rèn)支持DSL風(fēng)格語法,例如:
//查看DataFrame中的內(nèi)容
df.show()
//查看DataFrame部分列中的內(nèi)容
df.select(df['name'], df['age'] + 1).show()
df.select("name").show()
//打印DataFrame的Schema信息
df.printSchema()
//過濾age大于等于 21 的
df.filter(df['age'] > 21).show()
//按年齡進(jìn)行分組并統(tǒng)計(jì)相同年齡的人數(shù)
personDF.groupBy("age").count().show()將DataFrame注冊成表或視圖之后即可進(jìn)行純SQL操作:
df.createOrReplaceTempView("people")
//df.createTempView("t_person")
//查詢年齡最大的前兩名
spark.sql("select * from t_person order by age desc limit 2").show()
//顯示表的Schema信息
spark.sql("desc t_person").show()Pyspark可以直接很方便的注冊u(píng)df并直接使用:
strlen = spark.udf.register("len", lambda x: len(x))
print(spark.sql("SELECT len('test') length").collect())
print(spark.sql("SELECT 'foo' AS text").select(strlen("text").alias('length')).collect())執(zhí)行結(jié)果:
[Row(length='4')]
[Row(length='3')]
RDD的簡介
DataFrame的本質(zhì)是對RDD的包裝,可以理解為DataFrame=RDD[Row]+schema。
RDD(A Resilient Distributed Dataset)叫做彈性可伸縮分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象。它代表一個(gè)不可變、自動(dòng)容錯(cuò)、可伸縮性、可分區(qū)、里面的元素可并行計(jì)算的集合。
在每一個(gè)RDD內(nèi)部具有五大屬性:
- 具有一系列的分區(qū)
- 一個(gè)計(jì)算函數(shù)操作于每一個(gè)切片
- 具有一個(gè)對其他RDD的依賴列表
- 對于 key-value RDDs具有一個(gè)Partitioner分區(qū)器
- 存儲(chǔ)每一個(gè)切片最佳計(jì)算位置
一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對于RDD來說,每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理,并決定并行計(jì)算的粒度。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù),如果沒有指定,那么就會(huì)采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。
**一個(gè)計(jì)算每個(gè)分區(qū)的函數(shù)。**Spark中RDD的計(jì)算是以分片為單位的,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的。compute函數(shù)會(huì)對迭代器進(jìn)行復(fù)合,不需要保存每次計(jì)算的結(jié)果。
**RDD之間的依賴關(guān)系。**RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對RDD的所有分區(qū)進(jìn)行重新計(jì)算。
**一個(gè)Partitioner,即RDD的分片函數(shù)。**當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù),一個(gè)是基于哈希的HashPartitioner,另外一個(gè)是基于范圍的RangePartitioner。只有對于于key-value的RDD,才會(huì)有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。
**一個(gè)列表,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)。**對于一個(gè)HDFS文件來說,這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置。
RDD的API概覽
RDD包含Transformation API和 Action API,Transformation API都是延遲加載的只是記住這些應(yīng)用到基礎(chǔ)數(shù)據(jù)集上的轉(zhuǎn)換動(dòng)作,只有當(dāng)執(zhí)行Action API時(shí)這些轉(zhuǎn)換才會(huì)真正運(yùn)行。
Transformation API產(chǎn)生的兩類RDD最重要,分別是MapPartitionsRDD和ShuffledRDD。
產(chǎn)生MapPartitionsRDD的算子有map、keyBy、keys、values、flatMap、mapValues 、flatMapValues、mapPartitions、mapPartitionsWithIndex、glom、filter和filterByRange 。其中用的最多的是map和flatMap,但任何產(chǎn)生MapPartitionsRDD的算子都可以直接使用mapPartitions或mapPartitionsWithIndex實(shí)現(xiàn)。
產(chǎn)生ShuffledRDD的算子有combineByKeyWithClassTag、combineByKey、aggregateByKey、foldByKey 、reduceByKey 、distinct、groupByKey、groupBy、partitionBy、sortByKey 和 repartitionAndSortWithinPartitions。
combineByKey到groupByKey 底層均是調(diào)用combineByKeyWithClassTag方法:
@Experimental
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner,mergeValue,mergeCombiners
,defaultPartitioner(self))
}
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}三個(gè)重要參數(shù)的含義:
- createCombiner:根據(jù)每個(gè)分區(qū)的第一個(gè)元素操作產(chǎn)生一個(gè)初始值
- mergeValue:對每個(gè)分區(qū)內(nèi)部的元素進(jìn)行迭代合并
- mergeCombiners:對所有分區(qū)的合并結(jié)果進(jìn)行合并
groupByKey的partitioner未指定時(shí)會(huì)傳入默認(rèn)的defaultPartitioner。例如:
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2).keyBy(_.length)
a.groupByKey.collect
res9: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(lion)), (6,CompactBuffer(spider)), (3,CompactBuffer(dog, cat)), (5,CompactBuffer(tiger, eagle)))aggregateByKey:每個(gè)分區(qū)使用zeroValue作為初始值,迭代每一個(gè)元素用seqOp進(jìn)行合并,對所有分區(qū)的結(jié)果用combOp進(jìn)行合并。例如:
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
res7: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))reduceByKey :每個(gè)分區(qū)迭代每一個(gè)元素用func進(jìn)行合并,對所有分區(qū)的結(jié)果用func再進(jìn)行合并,例如:
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collectAction API有:
| 動(dòng)作 | 含義 |
|---|---|
| reduce(func) | 通過func函數(shù)聚集RDD中的所有元素,這個(gè)功能必須是課交換且可并聯(lián)的 |
| collect() | 在驅(qū)動(dòng)程序中,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素 |
| count() | 返回RDD的元素個(gè)數(shù) |
| first() | 返回RDD的第一個(gè)元素(類似于take(1)) |
| take(n) | 返回一個(gè)由數(shù)據(jù)集的前n個(gè)元素組成的數(shù)組 |
| takeSample(withReplacement*,*num, [seed]) | 返回一個(gè)數(shù)組,該數(shù)組由從數(shù)據(jù)集中隨機(jī)采樣的num個(gè)元素組成,可以選擇是否用隨機(jī)數(shù)替換不足的部分,seed用于指定隨機(jī)數(shù)生成器種子 |
| takeOrdered(n, [ordering]) | 排序并取前N個(gè)元素 |
| saveAsTextFile(path) | 將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng),對于每個(gè)元素,Spark將會(huì)調(diào)用toString方法,將它裝換為文件中的文本 |
| saveAsSequenceFile(path) | 將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)。 |
| saveAsObjectFile(path) | 將RDD中的元素用NullWritable作為key,實(shí)際元素作為value保存為sequencefile格式 |
| countByKey() | 針對(K,V)類型的RDD,返回一個(gè)(K,Int)的map,表示每一個(gè)key對應(yīng)的元素個(gè)數(shù)。 |
| foreach(func) | 在數(shù)據(jù)集的每一個(gè)元素上,運(yùn)行函數(shù)func進(jìn)行更新。 |
spark模擬實(shí)現(xiàn)mapreduce版wordcount:
object MapreduceWordcount {
def main(args: Array[String]): Unit = {
import org.apache.spark._
val sc: SparkContext = new SparkContext(new SparkConf().setAppName("wordcount").setMaster("local[*]"))
sc.setLogLevel("WARN")
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.rdd.HadoopRDD
import scala.collection.mutable.ArrayBuffer
def map(k: LongWritable, v: Text, collect: ArrayBuffer[(String, Int)]) = {
for (word <- v.toString.split("\\s+"))
collect += ((word, 1))
}
def reduce(key: String, value: Iterator[Int], collect: ArrayBuffer[(String, Int)]) = {
collect += ((key, value.sum))
}
val rdd = sc.hadoopFile("/hdfs/wordcount/in1/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 2)
.asInstanceOf[HadoopRDD[LongWritable, Text]]
.mapPartitionsWithInputSplit((split, it) =>{
val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
it.foreach(kv => map(kv._1, kv._2, collect))
collect.toIterator
})
.repartitionAndSortWithinPartitions(new HashPartitioner(2))
.mapPartitions(it => {
val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
var lastKey: String = ""
var values: ArrayBuffer[Int] = ArrayBuffer[Int]()
for ((currKey, value) <- it) {
if (!currKey.equals(lastKey)) {
if (values.length != 0)
reduce(lastKey, values.toIterator, collect)
values.clear()
}
values += value
lastKey = currKey
}
if (values.length != 0) reduce(lastKey, values.toIterator, collect)
collect.toIterator
})
rdd.foreach(println)
}
}各類RDD

- ShuffledRDD :表示需要走Shuffle過程的網(wǎng)絡(luò)傳輸
- CoalescedRDD :用于將一臺(tái)機(jī)器的多個(gè)分區(qū)合并成一個(gè)分區(qū)
- CartesianRDD :對兩個(gè)RDD的所有元素產(chǎn)生笛卡爾積
- MapPartitionsRDD :用于對每個(gè)分區(qū)的數(shù)據(jù)進(jìn)行特定的處理
- CoGroupedRDD :用于將2~4個(gè)rdd,按照key進(jìn)行連接聚合
- SubtractedRDD :用于對2個(gè)RDD求差集
- UnionRDD和PartitionerAwareUnionRDD :用于對2個(gè)RDD求并集
- ZippedPartitionsRDD2:zip拉鏈操作產(chǎn)生的RDD
- ZippedWithIndexRDD:給每一個(gè)元素標(biāo)記一個(gè)自增編號(hào)
- PartitionwiseSampledRDD:用于對rdd的元素按照指定的百分比進(jìn)行隨機(jī)采樣
當(dāng)我們需要給Datafream添加自增列時(shí),可以使用zipWithUniqueId方法:
from pyspark.sql.types import StructType, LongType
schema = data.schema.add(StructField("id", LongType()))
rowRDD = data.rdd.zipWithUniqueId().map(lambda t: t[0]+Row(t[1]))
data = rowRDD.toDF(schema)
data.show()API用法詳情可參考:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD
cache&checkpoint
RDD通過persist方法或cache方法可以將前面的計(jì)算結(jié)果緩存,但是并不是這兩個(gè)方法被調(diào)用時(shí)立即緩存,而是觸發(fā)后面的action時(shí),該RDD將會(huì)被緩存在計(jì)算節(jié)點(diǎn)的內(nèi)存中,并供后面重用。
rdd.persist()
checkpoint的源碼注釋可以看到:
- 標(biāo)記該RDD作為檢查點(diǎn)。
- 它將被保存在通過SparkContext#setCheckpointDir方法設(shè)置的檢查點(diǎn)目錄中
- 它所引用的所有父RDD引用將全部被移除
- 這個(gè)方法在這個(gè)RDD上必須在所有job執(zhí)行前運(yùn)行。
- 強(qiáng)烈建議將這個(gè)RDD緩存在內(nèi)存中,否則這個(gè)保存文件的計(jì)算任務(wù)將重新計(jì)算。
從中我們得知,在執(zhí)行checkpoint方法時(shí),最好同時(shí),將該RDD緩存起來,否則,checkpoint也會(huì)產(chǎn)生一個(gè)計(jì)算任務(wù)。
sc.setCheckpointDir("checkpoint")
rdd.cache()
rdd.checkpoint()graphframes 的用法
GraphFrame是將Spark中的Graph算法統(tǒng)一到DataFrame接口的Graph操作接口,為Scala、Java和Python提供了統(tǒng)一的圖處理API。
Graphframes是開源項(xiàng)目,源碼工程如下:https://github.com/graphframes/graphframes
可以參考:
- 官網(wǎng):https://graphframes.github.io/graphframes/docs/_site/index.html
- GraphFrames用戶指南-Python — Databricks文檔:https://docs.databricks.com/spark/latest/graph-analysis/graphframes/user-guide-python.html
在GraphFrames中圖的頂點(diǎn)(Vertex)和邊(edge)都是以DataFrame形式存儲(chǔ)的:
- 頂點(diǎn)DataFrame:必須包含列名為“id”的列,用于作為頂點(diǎn)的唯一標(biāo)識(shí)
- 邊DataFrame:必須包含列名為“src”和“dst”的列,根據(jù)唯一標(biāo)識(shí)id標(biāo)識(shí)關(guān)系
創(chuàng)建圖的示例:
from graphframes import GraphFrame
vertices = spark.createDataFrame([
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 36),
("g", "Gabby", 60)], ["id", "name", "age"])
edges = spark.createDataFrame([
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend")
], ["src", "dst", "relationship"])
# 生成圖
g = GraphFrame(vertices, edges)GraphFrame提供三種視圖:
print("頂點(diǎn)表視圖:")
graph.vertices.show() # graph.vertices 就是原始的vertices
print("邊表視圖:")
graph.edges.show() # graph.edges 就是原始的 edges
print("三元組視圖:")
graph.triplets.show()獲取頂點(diǎn)的度、入度和出度:
# 頂點(diǎn)的度 graph.degrees.show() # 頂點(diǎn)的入度 graph.inDegrees.show() # 頂點(diǎn)的出度 graph.outDegrees.show()
Motif finding (模式發(fā)現(xiàn))
示例:
# 多個(gè)路徑條件
motif = graph.find("(a)-[e]->(b); (b)-[e2]->(a)")
# 在搜索的結(jié)果上進(jìn)行過濾
motif.filter("b.age > 30")
# 不需要返回路徑中的元素時(shí),可以使用匿名頂點(diǎn)和邊
motif = graph.find("(start)-[]->()")
# 設(shè)置路徑不存在的條件
motif = graph.find("(a)-[]->(b); !(b)-[]->(a)")假設(shè)我們要想給用戶推薦關(guān)注的人,可以找出這樣的關(guān)系:A關(guān)注B,B關(guān)注C,但是A未關(guān)注C。找出這樣的關(guān)系就可以把C推薦給A:
# Motif: A->B->C but not A->C
results = graph.find("(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)")
# 排除自己
results = results.filter("A.id != C.id")
# 選擇需要的列
results = results.select(results.A.id.alias("A"), results.C.id.alias("C"))
results.show()結(jié)果:
+---+---+
| A| C|
+---+---+
| e| c|
| e| a|
| d| b|
| a| d|
| f| b|
| d| e|
| a| f|
| a| c|
+---+---+
Motif在查找路徑過程的過程中,還可以沿著路徑攜帶狀態(tài)。例如我們想要找出關(guān)系鏈有4個(gè)頂點(diǎn),而且其中3條邊全部都是"friend"關(guān)系:
from pyspark.sql.functions import col, lit, when
from functools import reduce
chain4 = graph.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
def sumFriends(cnt, relationship):
"定義下一個(gè)頂點(diǎn)更新狀態(tài)的條件:如果關(guān)系為friend則cnt+1"
return when(relationship == "friend", cnt+1).otherwise(cnt)
# 將更新方法應(yīng)用到整個(gè)鏈的,鏈上每有一個(gè)關(guān)系是 friend 就加一,鏈上共三個(gè)關(guān)系。
condition = reduce(lambda cnt, e: sumFriends(
cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0))
chainWith2Friends2 = chain4.where(condition >= 3)
chainWith2Friends2.show()結(jié)果:
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
| a| ab| b| bc| c| cd| d|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, b, friend}| {b, Bob, 36}|
| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
Subgraphs 子圖
可以直接過濾其頂點(diǎn)或邊,dropIsolatedVertices()方法用于刪除孤立沒有連接的點(diǎn):
graph.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()還可以基于模式發(fā)現(xiàn)獲取到的邊創(chuàng)建Subgraphs :
paths = graph.find("(a)-[e]->(b)")\
.filter("e.relationship = 'follow'")\
.filter("a.age < b.age")
# 抽取邊信息e2 = paths.select("e.src", "e.dst", "e.relationship")
e2 = paths.select("e.*")
# 創(chuàng)建Subgraphs
g2 = GraphFrame(graph.vertices, e2)GraphFrames支持的GraphX算法
- PageRank:查找圖中的重要頂點(diǎn)。
- 廣度優(yōu)先搜索(BFS):查找從一組頂點(diǎn)到另一組頂點(diǎn)的最短路徑
- 連通組件(ConnectedComponents):為具備連接關(guān)系的頂點(diǎn)分配相同的組件ID
- 強(qiáng)連通組件(StronglyConnectedConponents):根據(jù)每個(gè)頂點(diǎn)的強(qiáng)連通分量分配SCC。
- 最短路徑(Shortest paths):查找從每個(gè)頂點(diǎn)到目標(biāo)頂點(diǎn)集的最短路徑。
- 三角形計(jì)數(shù)(TriangleCount):計(jì)算每個(gè)頂點(diǎn)所屬的三角形的數(shù)量,經(jīng)常用于確定組的穩(wěn)定性(相互連接的數(shù)量代表了穩(wěn)定性)或作為其他網(wǎng)絡(luò)度量(如聚類系數(shù))的一部分,在社交網(wǎng)絡(luò)分析中用來檢測社區(qū)。
- 標(biāo)簽傳播算法(LPA):檢測圖中的社區(qū)。
pageRank算法:
results = graph.pageRank(resetProbability=0.15, maxIter=10)
results.vertices.sort("pagerank", ascending=False).show()結(jié)果:
+---+-------+---+-------------------+
| id| name|age| pagerank|
+---+-------+---+-------------------+
| b| Bob| 36| 2.7025217677349773|
| c|Charlie| 30| 2.6667877057849627|
| a| Alice| 34| 0.4485115093698443|
| e| Esther| 32| 0.3613490987992571|
| f| Fanny| 36|0.32504910549694244|
| d| David| 29|0.32504910549694244|
| g| Gabby| 60|0.17073170731707318|
+---+-------+---+-------------------+
可以設(shè)置起始頂點(diǎn):
graph.pageRank(resetProbability=0.15, maxIter=10, sourceId="a") graph.parallelPersonalizedPageRank(resetProbability=0.15, sourceIds=["a", "b", "c", "d"], maxIter=10)
廣度優(yōu)先搜索BFS:
搜索從姓名叫Esther到年齡小于32的最小路徑:
paths = graph.bfs("name = 'Esther'", "age < 32")
paths.show()+--------------+--------------+---------------+
| from| e0| to|
+--------------+--------------+---------------+
|{a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+--------------+--------------+---------------+可以指定只能在指定的邊搜索:
graph.bfs("name = 'Esther'",
"age < 32",
edgeFilter="relationship != 'friend'",
maxPathLength=4
).show()+---------------+--------------+--------------+--------------+----------------+
| from| e0| v1| e1| to|
+---------------+--------------+--------------+--------------+----------------+
|{e, Esther, 32}|{e, f, follow}|{f, Fanny, 36}|{f, c, follow}|{c, Charlie, 30}|
+---------------+--------------+--------------+--------------+----------------+Connected components 連通組件:
必須先設(shè)置檢查點(diǎn):
sc.setCheckpointDir("checkpoint")
graph.connectedComponents().show()結(jié)果:
+---+-------+---+------------+
| id| name|age| component|
+---+-------+---+------------+
| a| Alice| 34|412316860416|
| b| Bob| 36|412316860416|
| c|Charlie| 30|412316860416|
| d| David| 29|412316860416|
| e| Esther| 32|412316860416|
| f| Fanny| 36|412316860416|
| g| Gabby| 60|146028888064|
+---+-------+---+------------+
可以看到僅g點(diǎn)在一個(gè)連通區(qū)域內(nèi),可以調(diào)用dropIsolatedVertices()方法,刪除這種孤立的沒有連接的點(diǎn):
graph.dropIsolatedVertices().connectedComponents().show()
結(jié)果:
+---+-------+---+------------+
| id| name|age| component|
+---+-------+---+------------+
| a| Alice| 34|412316860416|
| b| Bob| 36|412316860416|
| c|Charlie| 30|412316860416|
| d| David| 29|412316860416|
| e| Esther| 32|412316860416|
| f| Fanny| 36|412316860416|
+---+-------+---+------------+
Strongly connected components 強(qiáng)連通組件:
graph.stronglyConnectedComponents(maxIter=10).show()
Shortest paths 最短路徑:
每個(gè)頂點(diǎn)到a或d的最短路徑:
graph.shortestPaths(landmarks=["a", "d"]).show()
+---+-------+---+----------------+
| id| name|age| distances|
+---+-------+---+----------------+
| g| Gabby| 60| {}|
| f| Fanny| 36| {}|
| e| Esther| 32|{a -> 2, d -> 1}|
| d| David| 29|{a -> 1, d -> 0}|
| c|Charlie| 30| {}|
| b| Bob| 36| {}|
| a| Alice| 34|{a -> 0, d -> 2}|
+---+-------+---+----------------+Triangle count 三角形計(jì)數(shù):
graph.triangleCount().show()
+-----+---+-------+---+ |count| id| name|age| +-----+---+-------+---+ | 1| a| Alice| 34| | 0| b| Bob| 36| | 0| c|Charlie| 30| | 1| d| David| 29| | 1| e| Esther| 32| | 0| g| Gabby| 60| | 0| f| Fanny| 36| +-----+---+-------+---+
說明頂點(diǎn)a/e/d構(gòu)成三角形。
標(biāo)簽傳播算法(LPA):
graph.labelPropagation(maxIter=5).orderBy("label").show()+---+-------+---+-------------+ | id| name|age| label| +---+-------+---+-------------+ | g| Gabby| 60| 146028888064| | f| Fanny| 36|1047972020224| | b| Bob| 36|1047972020224| | a| Alice| 34|1382979469312| | c|Charlie| 30|1382979469312| | e| Esther| 32|1460288880640| | d| David| 29|1460288880640| +---+-------+---+-------------+
PySpark3.X與pandas融合
Pyspark從3.0版本開始出現(xiàn)了pandas_udf裝飾器、applyInPandas和mapInPandas,基于這些方法,我們就可以使用熟悉的pandas的語法處理spark對象的數(shù)據(jù)。
首先創(chuàng)建幾條測試數(shù)據(jù),并啟動(dòng) Apache Arrow:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
df.show()自定義UDF和UDAF
pyspark暫不支持自定義UDTF。
使用pandas_udf裝飾器我們可以創(chuàng)建出基于pandas的udf自定義函數(shù),在DSL的語法中可以被直接使用:
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf("double")
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
df.select(multiply_func("id", "v").alias("product")).show()注冊函數(shù)和視圖后,可以直接在SQL中使用:
df.createOrReplaceTempView("t")
spark.udf.register("multiply", multiply_func)
spark.sql('select multiply(id, v) product from t').show()結(jié)果均為:
+-------+
|product|
+-------+
| 1.0|
| 2.0|
| 6.0|
| 10.0|
| 20.0|
+-------+
還支持聚合函數(shù)和窗口函數(shù):
from pyspark.sql import Window
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
# 對字段'v'進(jìn)行求均值
df.select(mean_udf('v').alias("mean_v")).show()
# 按照‘id'分組,求'v'的均值
df.groupby("id").agg(mean_udf('v').alias("mean_v")).show()
# 按照‘id'分組,求'v'的均值,并賦值給新的一列
df.withColumn('mean_v', mean_udf("v").over(Window.partitionBy('id'))).show()注冊到udf之后同樣可以直接使用SQL實(shí)現(xiàn):
spark.udf.register("mean2", mean_udf)
spark.sql('select mean2(v) mean_v from t').show()
spark.sql('select id,mean2(v) mean_v from t group by id').show()
spark.sql('select id,v,mean2(v) over(partition by id) mean_v from t').show()結(jié)果均為:
+--------+
| mean_v |
+--------+
| 4.2|
+--------++---+--------+
| id| mean_v |
+---+--------+
| 1| 1.5|
| 2| 6.0|
+---+--------++---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.5|
| 1| 2.0| 1.5|
| 2| 3.0| 6.0|
| 2| 5.0| 6.0|
| 2|10.0| 6.0|
+---+----+------+
分組聚合與JOIN
applyInPandas需要在datafream調(diào)用groupby之后才能使用:
def subtract_mean(pdf):
v = pdf.v
pdf['v1'] = v - v.mean()
pdf['v2'] = v + v.mean()
return pdf
t = df.groupby("id")
t.applyInPandas(
subtract_mean, schema="id long, v double, v1 double, v2 double").show()結(jié)果:
+---+----+----+----+
| id| v| v1| v2|
+---+----+----+----+
| 1| 1.0|-0.5| 2.5|
| 1| 2.0| 0.5| 3.5|
| 2| 3.0|-3.0| 9.0|
| 2| 5.0|-1.0|11.0|
| 2|10.0| 4.0|16.0|
+---+----+----+----+
subtract_mean函數(shù)接收的是對應(yīng)id的dataframe數(shù)據(jù),schema指定了返回值的名稱和類型列表。
通過以下代碼我們可以知道,applyInPandas可以借助cogroup進(jìn)行表連接:
val a = sc.parallelize(List(1, 2, 1, 3)) val b = a.map((_, "b")) val c = a.map((_, "c")) val d = a.map((_, "d")) val e = a.map((_, "e")) scala> b.cogroup(c).foreach(println) (3,(CompactBuffer(b),CompactBuffer(c))) (1,(CompactBuffer(b, b),CompactBuffer(c, c))) (2,(CompactBuffer(b),CompactBuffer(c)))
示例:
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
# l、r is a pandas.DataFrame
# 這里是按照id分組
# 那么,l和r分別是對應(yīng)id的df1和df2數(shù)據(jù)
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+map迭代
執(zhí)行以下代碼:
def filter_func(iterator):
for i, pdf in enumerate(iterator):
print(i, pdf.values.tolist())
yield pdf
df.mapInPandas(filter_func, schema=df.schema).show()后臺(tái)看到執(zhí)行結(jié)果為:
0 [[2.0, 5.0]]
0 [[2.0, 3.0]]
0 [[1.0, 1.0]]
0 [[1.0, 2.0]]
0 [[2.0, 10.0]]
前臺(tái)結(jié)果幾乎保持原樣??梢灾纈terator是一個(gè)分區(qū)迭代器,迭代出當(dāng)前分區(qū)的每一行數(shù)據(jù)都被封裝成一個(gè)pandas對象。
Pyspark與Pandas的交互
將spark的Datafream對象轉(zhuǎn)換為原生的pandas對象只需調(diào)用toPandas()方法即可:
sdf.toPandas()
將原生的pandas對象轉(zhuǎn)換為spark對象可以使用spark的頂級(jí)方法:
spark.createDataFrame(pdf)
習(xí)慣使用pandas的童鞋,還可以直接使用pandas-on-Spark,在spark3.2.0版本時(shí)已經(jīng)匹配到pandas 1.3版本的API。通過pandas-on-Spark,我們可以完全用pandas的api操作數(shù)據(jù),而底層執(zhí)行卻是spark的并行化。
使用pandas-on-Spark最好設(shè)置一下環(huán)境變量:
import os os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
將spark對象轉(zhuǎn)換為pandas-on-Spark對象:
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
pdf = df.to_pandas_on_spark()
print(type(pdf))
pdf
pandas-on-Spark對象也可以還原成spark對象:
pdf.to_spark()
另外spark提供直接將文件讀取成pandas-on-Spark對象的api,例如:
import pyspark.pandas as ps
pdf = ps.read_csv("example_csv.csv")ps對象與原生pandas對象的API幾乎完全一致。
ps對象相對于原生pandas對象的API幾乎一致,同時(shí)還支持一些強(qiáng)悍的功能,例如直接以SQL形式訪問:
ps.sql("SELECT count(*) as num FROM {pdf}"){pdf}訪問了變量名為pdf的pandas-on-Spark對象。
到此這篇關(guān)于PySpark與GraphFrames的安裝與使用的文章就介紹到這了,更多相關(guān)PySpark與GraphFrames使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python numpy數(shù)組復(fù)制使用實(shí)例解析
這篇文章主要介紹了python numpy數(shù)組復(fù)制使用實(shí)例解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-01-01
Anaconda環(huán)境克隆、遷移的詳細(xì)步驟
最近需要在多臺(tái)計(jì)算機(jī)上工作,每次重新部署環(huán)境比較麻煩,所以學(xué)習(xí)一下anaconda環(huán)境遷移的方法,下面這篇文章主要給大家介紹了關(guān)于Anaconda環(huán)境克隆、遷移的詳細(xì)步驟,需要的朋友可以參考下2022-08-08
python讀取mnist數(shù)據(jù)集方法案例詳解
這篇文章主要介紹了python讀取mnist數(shù)據(jù)集方法案例詳解,本篇文章通過簡要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-09-09
python使用PIL把透明背景圖片轉(zhuǎn)成白色背景的示例代碼
當(dāng)我們在采集一些圖片的時(shí)候,這些圖片的背景經(jīng)常是透明的,但是如何把透明背景轉(zhuǎn)成白色背景呢,接下來就給大家解決這個(gè)問題,本文主要介紹了python使用PIL把透明背景圖片轉(zhuǎn)成白色背景,需要的朋友可以參考下2023-08-08

