欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

PySpark與GraphFrames的安裝與使用環(huán)境搭建過程

 更新時間:2022年02月21日 09:56:17   作者:小小明-代碼實體  
這篇文章主要介紹了PySpark與GraphFrames的安裝與使用教程,本文通過圖文并茂實例代碼相結(jié)合給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下

PySpark環(huán)境搭建

配置hadoop

spark訪問本地文件并執(zhí)行運算時,可能會遇到權(quán)限問題或是dll錯誤。這是因為spark需要使用到Hadoop的winutils和hadoop.dll,首先我們必須配置好Hadoop相關(guān)的環(huán)境。可以到github下載:https://github.com/4ttty/winutils

gitcode提供了鏡像加速:https://gitcode.net/mirrors/4ttty/winutils

我選擇了使用這個倉庫提供的最高的Hadoop版本3.0.0將其解壓到D:\deploy\hadoop-3.0.0目錄下,然后配置環(huán)境變量:

image-20220217132732434

我們還需要將對應(yīng)的hadoop.dll復制到系統(tǒng)中,用命令表達就是:

copy D:\deploy\hadoop-3.0.0\bin\hadoop.dll C:\Windows\System32

不過這步需要擁有管理員權(quán)限才可以操作。

為了能夠在任何地方使用winutils命令工具,將%HADOOP_HOME%\bin目錄加入環(huán)境變量中:

image-20220217133520475

安裝pyspark與Java

首先,我們安裝spark當前(2022-2-17)的最新版本:

pip install pyspark==3.2.1

需要注意pyspark的版本決定了jdk的最高版本,例如假如安裝2.4.5版本的pyspark就只能安裝1.8版本的jdk,否則會報出java.lang.IllegalArgumentException: Unsupported class file major version 55的錯誤。

這是因為pyspark內(nèi)置了Scala,而Scala是基于jvm的編程語言,Scala與jdk的版本存在兼容性問題,JDK與scala的版本兼容性表:

JDK versionMinimum Scala versionsRecommended Scala versions
172.13.6, 2.12.15 (forthcoming)2.13.6, 2.12.15 (forthcoming)
162.13.5, 2.12.142.13.6, 2.12.14
13, 14, 152.13.2, 2.12.112.13.6, 2.12.14
122.13.1, 2.12.92.13.6, 2.12.14
112.13.0, 2.12.4, 2.11.122.13.6, 2.12.14, 2.11.12
82.13.0, 2.12.0, 2.11.0, 2.10.22.13.6, 2.12.14, 2.11.12, 2.10.7
6, 72.11.0, 2.10.02.11.12, 2.10.7

當前3.2.1版本的pyspark內(nèi)置的Scala版本為2.12.15,意味著jdk17與其以下的所有版本都支持。

這里我依然選擇安裝jdk8的版本:

image-20220217143447453

測試一下:

>java -version
java version "1.8.0_201"
Java(TM) SE Runtime Environment (build 1.8.0_201-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)

jdk11的詳細安裝教程(jdk1.8在官網(wǎng)只有安裝包,無zip綠化壓縮包):

綠化版Java11的環(huán)境配置與Python調(diào)用Java
https://xxmdmst.blog.csdn.net/article/details/118366166

graphframes安裝

pip安裝當前最新的graphframes:

pip install graphframes==0.6

然后在官網(wǎng)下載graphframes的jar包。

下載地址:https://spark-packages.org/package/graphframes/graphframes

由于安裝的pyspark版本是3.2,所以這里我選擇了這個jar包:

image-20220217144829403

然后將該jar包放入pyspark安裝目錄的jars目錄下:

image-20220217145105414

pyspark安裝位置可以通過pip查看:

C:\Users\ASUS>pip show pyspark
Name: pyspark
Version: 3.2.1
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: d:\miniconda3\lib\site-packages
Requires: py4j
Required-by:

使用方法

學習pyspark的最佳路徑是官網(wǎng):https://spark.apache.org/docs/latest/quick-start.html

在下面的網(wǎng)頁,官方提供了在線jupyter:

https://spark.apache.org/docs/latest/api/python/getting_started/index.html

image-20220218174237283

啟動spark并讀取數(shù)據(jù)

本地模式啟動spark:

from pyspark.sql import SparkSession, Row

spark = SparkSession \
    .builder \
    .appName("Python Spark") \
    .master("local[*]") \
    .getOrCreate()
sc = spark.sparkContext
spark

image-20220217153008268

SparkSession輸出的內(nèi)容中包含了spark的web頁面,新標簽頁打開頁面后大致效果如上。

點擊Environment選項卡可以查看當前環(huán)境中的變量:

image-20220217153531616

啟動hive支持

找到pyspark的安裝位置,例如我的電腦在D:\Miniconda3\Lib\site-packages\pyspark

手動創(chuàng)建conf目錄并將hive-site.xml配置文件復制到其中。如果hive使用了MySQL作為原數(shù)據(jù)庫,則還需要將MySQL對應(yīng)的驅(qū)動jar包放入spark的jars目錄下。

創(chuàng)建spark會話對象時可通過enableHiveSupport()開啟hive支持:

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .enableHiveSupport() \
    .getOrCreate()
sc = spark.sparkContext
spark

spark訪問hive自己創(chuàng)建的表有可能會出現(xiàn)如下的權(quán)限報錯:

Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS s
hould be writable. Current permissions are: rwx------

是因為當前用戶不具備對\tmp\hive的操作權(quán)限:

>winutils ls \tmp\hive
drwx------ 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May  4 2020 \tmp\hive

把\tmp\hive目錄的權(quán)限改為777即可順利訪問:

>winutils chmod 777 \tmp\hive

>winutils ls \tmp\hive
drwxrwxrwx 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May  4 2020 \tmp\hive

Spark的DataFrame與RDD

從spark2.x開始將RDD和DataFrame的API統(tǒng)一抽象成dataset,DataFrame就是Dataset[Row],RDD則是Dataset.rdd。可以將DataFrame理解為包含結(jié)構(gòu)化信息的RDD。

將含row的RDD轉(zhuǎn)換為DataFrame只需要調(diào)用toDF方法或SparkSession的createDataFrame方法即可,也可以傳入schema覆蓋類型或名稱設(shè)置。

DataFrame的基礎(chǔ)api

DataFrame默認支持DSL風格語法,例如:

//查看DataFrame中的內(nèi)容
df.show()

//查看DataFrame部分列中的內(nèi)容
df.select(df['name'], df['age'] + 1).show()
df.select("name").show()
//打印DataFrame的Schema信息
df.printSchema()
//過濾age大于等于 21 的
df.filter(df['age'] > 21).show()
//按年齡進行分組并統(tǒng)計相同年齡的人數(shù)
personDF.groupBy("age").count().show()

將DataFrame注冊成表或視圖之后即可進行純SQL操作:

df.createOrReplaceTempView("people")
//df.createTempView("t_person")

//查詢年齡最大的前兩名
spark.sql("select * from t_person order by age desc limit 2").show()
//顯示表的Schema信息
spark.sql("desc t_person").show()

Pyspark可以直接很方便的注冊udf并直接使用:

strlen = spark.udf.register("len", lambda x: len(x))
print(spark.sql("SELECT len('test') length").collect())
print(spark.sql("SELECT 'foo' AS text").select(strlen("text").alias('length')).collect())

執(zhí)行結(jié)果:

[Row(length='4')]
[Row(length='3')]

RDD的簡介

DataFrame的本質(zhì)是對RDD的包裝,可以理解為DataFrame=RDD[Row]+schema。

RDD(A Resilient Distributed Dataset)叫做彈性可伸縮分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象。它代表一個不可變、自動容錯、可伸縮性、可分區(qū)、里面的元素可并行計算的集合。

在每一個RDD內(nèi)部具有五大屬性:

  • 具有一系列的分區(qū)
  • 一個計算函數(shù)操作于每一個切片
  • 具有一個對其他RDD的依賴列表
  • 對于 key-value RDDs具有一個Partitioner分區(qū)器
  • 存儲每一個切片最佳計算位置

一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對于RDD來說,每個分片都會被一個計算任務(wù)處理,并決定并行計算的粒度。用戶可以在創(chuàng)建RDD時指定RDD的分片個數(shù),如果沒有指定,那么就會采用默認值。默認值就是程序所分配到的CPU Core的數(shù)目。

**一個計算每個分區(qū)的函數(shù)。**Spark中RDD的計算是以分片為單位的,每個RDD都會實現(xiàn)compute函數(shù)以達到這個目的。compute函數(shù)會對迭代器進行復合,不需要保存每次計算的結(jié)果。

**RDD之間的依賴關(guān)系。**RDD的每次轉(zhuǎn)換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時,Spark可以通過這個依賴關(guān)系重新計算丟失的分區(qū)數(shù)據(jù),而不是對RDD的所有分區(qū)進行重新計算。

**一個Partitioner,即RDD的分片函數(shù)。**當前Spark中實現(xiàn)了兩種類型的分片函數(shù),一個是基于哈希的HashPartitioner,另外一個是基于范圍的RangePartitioner。只有對于于key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時的分片數(shù)量。

**一個列表,存儲存取每個Partition的優(yōu)先位置(preferred location)。**對于一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數(shù)據(jù)不如移動計算”的理念,Spark在進行任務(wù)調(diào)度的時候,會盡可能地將計算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲位置。

RDD的API概覽

RDD包含Transformation APIAction API,Transformation API都是延遲加載的只是記住這些應(yīng)用到基礎(chǔ)數(shù)據(jù)集上的轉(zhuǎn)換動作,只有當執(zhí)行Action API時這些轉(zhuǎn)換才會真正運行。

Transformation API產(chǎn)生的兩類RDD最重要,分別是MapPartitionsRDDShuffledRDD

產(chǎn)生MapPartitionsRDD的算子有map、keyBy、keys、values、flatMap、mapValues 、flatMapValues、mapPartitions、mapPartitionsWithIndex、glom、filter和filterByRange 。其中用的最多的是mapflatMap,但任何產(chǎn)生MapPartitionsRDD的算子都可以直接使用mapPartitionsmapPartitionsWithIndex實現(xiàn)。

產(chǎn)生ShuffledRDD的算子有combineByKeyWithClassTag、combineByKey、aggregateByKey、foldByKey 、reduceByKey 、distinct、groupByKey、groupBy、partitionBy、sortByKey 和 repartitionAndSortWithinPartitions。

combineByKey到groupByKey 底層均是調(diào)用combineByKeyWithClassTag方法:

@Experimental
def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner,mergeValue,mergeCombiners
       ,defaultPartitioner(self))
}
def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
  val createCombiner = (v: V) => CompactBuffer(v)
  val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
  val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
  val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
    createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
  bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}

三個重要參數(shù)的含義:

  • createCombiner:根據(jù)每個分區(qū)的第一個元素操作產(chǎn)生一個初始值
  • mergeValue:對每個分區(qū)內(nèi)部的元素進行迭代合并
  • mergeCombiners:對所有分區(qū)的合并結(jié)果進行合并

groupByKey的partitioner未指定時會傳入默認的defaultPartitioner。例如:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2).keyBy(_.length)
a.groupByKey.collect

res9: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(lion)), (6,CompactBuffer(spider)), (3,CompactBuffer(dog, cat)), (5,CompactBuffer(tiger, eagle)))

aggregateByKey:每個分區(qū)使用zeroValue作為初始值,迭代每一個元素用seqOp進行合并,對所有分區(qū)的結(jié)果用combOp進行合并。例如:

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
res7: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))

reduceByKey :每個分區(qū)迭代每一個元素用func進行合并,對所有分區(qū)的結(jié)果用func再進行合并,例如:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect

Action API有:

動作含義
reduce(func)通過func函數(shù)聚集RDD中的所有元素,這個功能必須是課交換且可并聯(lián)的
collect()在驅(qū)動程序中,以數(shù)組的形式返回數(shù)據(jù)集的所有元素
count()返回RDD的元素個數(shù)
first()返回RDD的第一個元素(類似于take(1))
take(n)返回一個由數(shù)據(jù)集的前n個元素組成的數(shù)組
takeSample(withReplacement*,*num, [seed])返回一個數(shù)組,該數(shù)組由從數(shù)據(jù)集中隨機采樣的num個元素組成,可以選擇是否用隨機數(shù)替換不足的部分,seed用于指定隨機數(shù)生成器種子
takeOrdered(n, [ordering])排序并取前N個元素
saveAsTextFile(path)將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng),對于每個元素,Spark將會調(diào)用toString方法,將它裝換為文件中的文本
saveAsSequenceFile(path)將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)。
saveAsObjectFile(path)將RDD中的元素用NullWritable作為key,實際元素作為value保存為sequencefile格式
countByKey()針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應(yīng)的元素個數(shù)。
foreach(func)在數(shù)據(jù)集的每一個元素上,運行函數(shù)func進行更新。

spark模擬實現(xiàn)mapreduce版wordcount:

object MapreduceWordcount {
  def main(args: Array[String]): Unit = {
    import org.apache.spark._
    val sc: SparkContext = new SparkContext(new SparkConf().setAppName("wordcount").setMaster("local[*]"))
    sc.setLogLevel("WARN")
    
    import org.apache.hadoop.io.{LongWritable, Text}
    import org.apache.hadoop.mapred.TextInputFormat
    import org.apache.spark.rdd.HadoopRDD

    import scala.collection.mutable.ArrayBuffer
    
    def map(k: LongWritable, v: Text, collect: ArrayBuffer[(String, Int)]) = {
      for (word <- v.toString.split("\\s+"))
        collect += ((word, 1))
    }
    def reduce(key: String, value: Iterator[Int], collect: ArrayBuffer[(String, Int)]) = {
      collect += ((key, value.sum))
    }
    val rdd = sc.hadoopFile("/hdfs/wordcount/in1/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 2)
      .asInstanceOf[HadoopRDD[LongWritable, Text]]
      .mapPartitionsWithInputSplit((split, it) =>{
        val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
        it.foreach(kv => map(kv._1, kv._2, collect))
        collect.toIterator
      })
      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
      .mapPartitions(it => {
        val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
        var lastKey: String = ""
        var values: ArrayBuffer[Int] = ArrayBuffer[Int]()
        for ((currKey, value) <- it) {
          if (!currKey.equals(lastKey)) {
            if (values.length != 0)
              reduce(lastKey, values.toIterator, collect)
            values.clear()
          }
          values += value
          lastKey = currKey
        }
        if (values.length != 0) reduce(lastKey, values.toIterator, collect)
        collect.toIterator
      })
    rdd.foreach(println)
  }
}

各類RDD

image-20220218181635289

  • ShuffledRDD :表示需要走Shuffle過程的網(wǎng)絡(luò)傳輸
  • CoalescedRDD :用于將一臺機器的多個分區(qū)合并成一個分區(qū)
  • CartesianRDD :對兩個RDD的所有元素產(chǎn)生笛卡爾積
  • MapPartitionsRDD :用于對每個分區(qū)的數(shù)據(jù)進行特定的處理
  • CoGroupedRDD :用于將2~4個rdd,按照key進行連接聚合
  • SubtractedRDD :用于對2個RDD求差集
  • UnionRDDPartitionerAwareUnionRDD :用于對2個RDD求并集
  • ZippedPartitionsRDD2:zip拉鏈操作產(chǎn)生的RDD
  • ZippedWithIndexRDD:給每一個元素標記一個自增編號
  • PartitionwiseSampledRDD:用于對rdd的元素按照指定的百分比進行隨機采樣

當我們需要給Datafream添加自增列時,可以使用zipWithUniqueId方法:

from pyspark.sql.types import StructType, LongType

schema = data.schema.add(StructField("id", LongType()))
rowRDD = data.rdd.zipWithUniqueId().map(lambda t: t[0]+Row(t[1]))
data = rowRDD.toDF(schema)
data.show()

API用法詳情可參考:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD

cache&checkpoint

RDD通過persist方法或cache方法可以將前面的計算結(jié)果緩存,但是并不是這兩個方法被調(diào)用時立即緩存,而是觸發(fā)后面的action時,該RDD將會被緩存在計算節(jié)點的內(nèi)存中,并供后面重用。

rdd.persist()

checkpoint的源碼注釋可以看到:

  • 標記該RDD作為檢查點。
  • 它將被保存在通過SparkContext#setCheckpointDir方法設(shè)置的檢查點目錄中
  • 它所引用的所有父RDD引用將全部被移除
  • 這個方法在這個RDD上必須在所有job執(zhí)行前運行。
  • 強烈建議將這個RDD緩存在內(nèi)存中,否則這個保存文件的計算任務(wù)將重新計算。

從中我們得知,在執(zhí)行checkpoint方法時,最好同時,將該RDD緩存起來,否則,checkpoint也會產(chǎn)生一個計算任務(wù)。

sc.setCheckpointDir("checkpoint")
rdd.cache()
rdd.checkpoint()

graphframes 的用法

GraphFrame是將Spark中的Graph算法統(tǒng)一到DataFrame接口的Graph操作接口,為Scala、Java和Python提供了統(tǒng)一的圖處理API。

Graphframes是開源項目,源碼工程如下:https://github.com/graphframes/graphframes

可以參考:

  • 官網(wǎng):https://graphframes.github.io/graphframes/docs/_site/index.html
  • GraphFrames用戶指南-Python — Databricks文檔:https://docs.databricks.com/spark/latest/graph-analysis/graphframes/user-guide-python.html

在GraphFrames中圖的頂點(Vertex)和邊(edge)都是以DataFrame形式存儲的:

  • 頂點DataFrame:必須包含列名為“id”的列,用于作為頂點的唯一標識
  • 邊DataFrame:必須包含列名為“src”和“dst”的列,根據(jù)唯一標識id標識關(guān)系

創(chuàng)建圖的示例:

from graphframes import GraphFrame
vertices = spark.createDataFrame([
    ("a", "Alice", 34),
    ("b", "Bob", 36),
    ("c", "Charlie", 30),
    ("d", "David", 29),
    ("e", "Esther", 32),
    ("f", "Fanny", 36),
    ("g", "Gabby", 60)], ["id", "name", "age"])
edges = spark.createDataFrame([
    ("a", "b", "friend"),
    ("b", "c", "follow"),
    ("c", "b", "follow"),
    ("f", "c", "follow"),
    ("e", "f", "follow"),
    ("e", "d", "friend"),
    ("d", "a", "friend"),
    ("a", "e", "friend")
], ["src", "dst", "relationship"])
# 生成圖
g = GraphFrame(vertices, edges)

GraphFrame提供三種視圖:

print("頂點表視圖:")
graph.vertices.show() # graph.vertices 就是原始的vertices
print("邊表視圖:")
graph.edges.show() # graph.edges 就是原始的 edges
print("三元組視圖:")
graph.triplets.show()

獲取頂點的度、入度和出度:

# 頂點的度
graph.degrees.show()
# 頂點的入度
graph.inDegrees.show()
# 頂點的出度
graph.outDegrees.show()

Motif finding (模式發(fā)現(xiàn))

示例:

# 多個路徑條件
motif = graph.find("(a)-[e]->(b); (b)-[e2]->(a)")
# 在搜索的結(jié)果上進行過濾
motif.filter("b.age > 30")
# 不需要返回路徑中的元素時,可以使用匿名頂點和邊
motif = graph.find("(start)-[]->()")
# 設(shè)置路徑不存在的條件
motif = graph.find("(a)-[]->(b); !(b)-[]->(a)")

假設(shè)我們要想給用戶推薦關(guān)注的人,可以找出這樣的關(guān)系:A關(guān)注B,B關(guān)注C,但是A未關(guān)注C。找出這樣的關(guān)系就可以把C推薦給A:

# Motif: A->B->C but not A->C
results = graph.find("(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)")
# 排除自己
results = results.filter("A.id != C.id")
# 選擇需要的列
results = results.select(results.A.id.alias("A"), results.C.id.alias("C"))
results.show()

結(jié)果:

+---+---+
|  A|  C|
+---+---+
|  e|  c|
|  e|  a|
|  d|  b|
|  a|  d|
|  f|  b|
|  d|  e|
|  a|  f|
|  a|  c|
+---+---+

Motif在查找路徑過程的過程中,還可以沿著路徑攜帶狀態(tài)。例如我們想要找出關(guān)系鏈有4個頂點,而且其中3條邊全部都是"friend"關(guān)系:

from pyspark.sql.functions import col, lit, when
from functools import reduce
chain4 = graph.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
def sumFriends(cnt, relationship):
    "定義下一個頂點更新狀態(tài)的條件:如果關(guān)系為friend則cnt+1"
    return when(relationship == "friend", cnt+1).otherwise(cnt)
# 將更新方法應(yīng)用到整個鏈的,鏈上每有一個關(guān)系是 friend 就加一,鏈上共三個關(guān)系。
condition = reduce(lambda cnt, e: sumFriends(
    cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0))
chainWith2Friends2 = chain4.where(condition >= 3)
chainWith2Friends2.show()

結(jié)果:

+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
|              a|            ab|              b|            bc|              c|            cd|              d|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, b, friend}|   {b, Bob, 36}|
| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+

Subgraphs 子圖

可以直接過濾其頂點或邊,dropIsolatedVertices()方法用于刪除孤立沒有連接的點:

graph.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()

還可以基于模式發(fā)現(xiàn)獲取到的邊創(chuàng)建Subgraphs :

paths = graph.find("(a)-[e]->(b)")\
    .filter("e.relationship = 'follow'")\
    .filter("a.age < b.age")
# 抽取邊信息e2 = paths.select("e.src", "e.dst", "e.relationship")
e2 = paths.select("e.*")
# 創(chuàng)建Subgraphs
g2 = GraphFrame(graph.vertices, e2)

GraphFrames支持的GraphX算法

  • PageRank:查找圖中的重要頂點。
  • 廣度優(yōu)先搜索(BFS):查找從一組頂點到另一組頂點的最短路徑
  • 連通組件(ConnectedComponents):為具備連接關(guān)系的頂點分配相同的組件ID
  • 強連通組件(StronglyConnectedConponents):根據(jù)每個頂點的強連通分量分配SCC。
  • 最短路徑(Shortest paths):查找從每個頂點到目標頂點集的最短路徑。
  • 三角形計數(shù)(TriangleCount):計算每個頂點所屬的三角形的數(shù)量,經(jīng)常用于確定組的穩(wěn)定性(相互連接的數(shù)量代表了穩(wěn)定性)或作為其他網(wǎng)絡(luò)度量(如聚類系數(shù))的一部分,在社交網(wǎng)絡(luò)分析中用來檢測社區(qū)。
  • 標簽傳播算法(LPA):檢測圖中的社區(qū)。

pageRank算法:

results = graph.pageRank(resetProbability=0.15, maxIter=10)
results.vertices.sort("pagerank", ascending=False).show()

結(jié)果:

+---+-------+---+-------------------+
| id|   name|age|           pagerank|
+---+-------+---+-------------------+
|  b|    Bob| 36| 2.7025217677349773|
|  c|Charlie| 30| 2.6667877057849627|
|  a|  Alice| 34| 0.4485115093698443|
|  e| Esther| 32| 0.3613490987992571|
|  f|  Fanny| 36|0.32504910549694244|
|  d|  David| 29|0.32504910549694244|
|  g|  Gabby| 60|0.17073170731707318|
+---+-------+---+-------------------+

可以設(shè)置起始頂點:

graph.pageRank(resetProbability=0.15, maxIter=10, sourceId="a")
graph.parallelPersonalizedPageRank(resetProbability=0.15, sourceIds=["a", "b", "c", "d"], maxIter=10)

廣度優(yōu)先搜索BFS:

搜索從姓名叫Esther到年齡小于32的最小路徑:

paths = graph.bfs("name = 'Esther'", "age < 32")
paths.show()
+--------------+--------------+---------------+
|          from|            e0|             to|
+--------------+--------------+---------------+
|{a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+--------------+--------------+---------------+

可以指定只能在指定的邊搜索:

graph.bfs("name = 'Esther'",
          "age < 32",
          edgeFilter="relationship != 'friend'",
          maxPathLength=4
          ).show()
+---------------+--------------+--------------+--------------+----------------+
|           from|            e0|            v1|            e1|              to|
+---------------+--------------+--------------+--------------+----------------+
|{e, Esther, 32}|{e, f, follow}|{f, Fanny, 36}|{f, c, follow}|{c, Charlie, 30}|
+---------------+--------------+--------------+--------------+----------------+

Connected components 連通組件:

必須先設(shè)置檢查點:

sc.setCheckpointDir("checkpoint")

graph.connectedComponents().show()

結(jié)果:

+---+-------+---+------------+
| id|   name|age|   component|
+---+-------+---+------------+
|  a|  Alice| 34|412316860416|
|  b|    Bob| 36|412316860416|
|  c|Charlie| 30|412316860416|
|  d|  David| 29|412316860416|
|  e| Esther| 32|412316860416|
|  f|  Fanny| 36|412316860416|
|  g|  Gabby| 60|146028888064|
+---+-------+---+------------+

可以看到僅g點在一個連通區(qū)域內(nèi),可以調(diào)用dropIsolatedVertices()方法,刪除這種孤立的沒有連接的點:

graph.dropIsolatedVertices().connectedComponents().show()

結(jié)果:

+---+-------+---+------------+
| id|   name|age|   component|
+---+-------+---+------------+
|  a|  Alice| 34|412316860416|
|  b|    Bob| 36|412316860416|
|  c|Charlie| 30|412316860416|
|  d|  David| 29|412316860416|
|  e| Esther| 32|412316860416|
|  f|  Fanny| 36|412316860416|
+---+-------+---+------------+

Strongly connected components 強連通組件:

graph.stronglyConnectedComponents(maxIter=10).show()

Shortest paths 最短路徑:

每個頂點到a或d的最短路徑:

graph.shortestPaths(landmarks=["a", "d"]).show()
+---+-------+---+----------------+
| id|   name|age|       distances|
+---+-------+---+----------------+
|  g|  Gabby| 60|              {}|
|  f|  Fanny| 36|              {}|
|  e| Esther| 32|{a -> 2, d -> 1}|
|  d|  David| 29|{a -> 1, d -> 0}|
|  c|Charlie| 30|              {}|
|  b|    Bob| 36|              {}|
|  a|  Alice| 34|{a -> 0, d -> 2}|
+---+-------+---+----------------+

Triangle count 三角形計數(shù):

graph.triangleCount().show()
+-----+---+-------+---+
|count| id|   name|age|
+-----+---+-------+---+
|    1|  a|  Alice| 34|
|    0|  b|    Bob| 36|
|    0|  c|Charlie| 30|
|    1|  d|  David| 29|
|    1|  e| Esther| 32|
|    0|  g|  Gabby| 60|
|    0|  f|  Fanny| 36|
+-----+---+-------+---+

說明頂點a/e/d構(gòu)成三角形。

標簽傳播算法(LPA):

graph.labelPropagation(maxIter=5).orderBy("label").show()
+---+-------+---+-------------+
| id|   name|age|        label|
+---+-------+---+-------------+
|  g|  Gabby| 60| 146028888064|
|  f|  Fanny| 36|1047972020224|
|  b|    Bob| 36|1047972020224|
|  a|  Alice| 34|1382979469312|
|  c|Charlie| 30|1382979469312|
|  e| Esther| 32|1460288880640|
|  d|  David| 29|1460288880640|
+---+-------+---+-------------+

PySpark3.X與pandas融合

Pyspark從3.0版本開始出現(xiàn)了pandas_udf裝飾器、applyInPandas和mapInPandas,基于這些方法,我們就可以使用熟悉的pandas的語法處理spark對象的數(shù)據(jù)。

首先創(chuàng)建幾條測試數(shù)據(jù),并啟動 Apache Arrow

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))
df.show()

自定義UDF和UDAF

pyspark暫不支持自定義UDTF。

使用pandas_udf裝飾器我們可以創(chuàng)建出基于pandas的udf自定義函數(shù),在DSL的語法中可以被直接使用:

from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf("double")
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b
df.select(multiply_func("id", "v").alias("product")).show()

注冊函數(shù)和視圖后,可以直接在SQL中使用:

df.createOrReplaceTempView("t")
spark.udf.register("multiply", multiply_func)
spark.sql('select multiply(id, v) product from t').show()

結(jié)果均為:

+-------+
|product|
+-------+
|    1.0|
|    2.0|
|    6.0|
|   10.0|
|   20.0|
+-------+

還支持聚合函數(shù)和窗口函數(shù):

from pyspark.sql import Window

@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()
# 對字段'v'進行求均值
df.select(mean_udf('v').alias("mean_v")).show()
# 按照‘id'分組,求'v'的均值
df.groupby("id").agg(mean_udf('v').alias("mean_v")).show()
# 按照‘id'分組,求'v'的均值,并賦值給新的一列
df.withColumn('mean_v', mean_udf("v").over(Window.partitionBy('id'))).show()

注冊到udf之后同樣可以直接使用SQL實現(xiàn):

spark.udf.register("mean2", mean_udf)
spark.sql('select mean2(v) mean_v from t').show()
spark.sql('select id,mean2(v) mean_v from t group by id').show()
spark.sql('select id,v,mean2(v) over(partition by id) mean_v from t').show()

結(jié)果均為:

+--------+
| mean_v |
+--------+
|     4.2|
+--------+

+---+--------+
| id| mean_v |
+---+--------+
|  1|     1.5|
|  2|     6.0|
+---+--------+

+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.5|
|  1| 2.0|   1.5|
|  2| 3.0|   6.0|
|  2| 5.0|   6.0|
|  2|10.0|   6.0|
+---+----+------+

分組聚合與JOIN

applyInPandas需要在datafream調(diào)用groupby之后才能使用:

def subtract_mean(pdf):
    v = pdf.v
    pdf['v1'] = v - v.mean()
    pdf['v2'] = v + v.mean()
    return pdf
t = df.groupby("id")
t.applyInPandas(
    subtract_mean, schema="id long, v double, v1 double, v2 double").show()

結(jié)果:

+---+----+----+----+
| id|   v|  v1|  v2|
+---+----+----+----+
|  1| 1.0|-0.5| 2.5|
|  1| 2.0| 0.5| 3.5|
|  2| 3.0|-3.0| 9.0|
|  2| 5.0|-1.0|11.0|
|  2|10.0| 4.0|16.0|
+---+----+----+----+

subtract_mean函數(shù)接收的是對應(yīng)id的dataframe數(shù)據(jù),schema指定了返回值的名稱和類型列表。

通過以下代碼我們可以知道,applyInPandas可以借助cogroup進行表連接:

val a = sc.parallelize(List(1, 2, 1, 3))
val b = a.map((_, "b"))
val c = a.map((_, "c"))
val d = a.map((_, "d"))
val e = a.map((_, "e"))
scala> b.cogroup(c).foreach(println)
(3,(CompactBuffer(b),CompactBuffer(c)))
(1,(CompactBuffer(b, b),CompactBuffer(c, c)))
(2,(CompactBuffer(b),CompactBuffer(c)))

示例:

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
  	# l、r is a pandas.DataFrame
  	# 這里是按照id分組
  	# 那么,l和r分別是對應(yīng)id的df1和df2數(shù)據(jù)
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

map迭代

執(zhí)行以下代碼:

def filter_func(iterator):
    for i, pdf in enumerate(iterator):
        print(i, pdf.values.tolist())
        yield pdf


df.mapInPandas(filter_func, schema=df.schema).show()

后臺看到執(zhí)行結(jié)果為:

0 [[2.0, 5.0]]                                                     
0 [[2.0, 3.0]]
0 [[1.0, 1.0]]
0 [[1.0, 2.0]]
0 [[2.0, 10.0]]

前臺結(jié)果幾乎保持原樣。可以知道iterator是一個分區(qū)迭代器,迭代出當前分區(qū)的每一行數(shù)據(jù)都被封裝成一個pandas對象。

Pyspark與Pandas的交互

將spark的Datafream對象轉(zhuǎn)換為原生的pandas對象只需調(diào)用toPandas()方法即可:

sdf.toPandas()

將原生的pandas對象轉(zhuǎn)換為spark對象可以使用spark的頂級方法:

spark.createDataFrame(pdf)

習慣使用pandas的童鞋,還可以直接使用pandas-on-Spark,在spark3.2.0版本時已經(jīng)匹配到pandas 1.3版本的API。通過pandas-on-Spark,我們可以完全用pandas的api操作數(shù)據(jù),而底層執(zhí)行卻是spark的并行化。

使用pandas-on-Spark最好設(shè)置一下環(huán)境變量:

import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

將spark對象轉(zhuǎn)換為pandas-on-Spark對象:

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

pdf = df.to_pandas_on_spark()
print(type(pdf))
pdf

image-20220218090829123

pandas-on-Spark對象也可以還原成spark對象:

pdf.to_spark()

另外spark提供直接將文件讀取成pandas-on-Spark對象的api,例如:

import pyspark.pandas as ps

pdf = ps.read_csv("example_csv.csv")

ps對象與原生pandas對象的API幾乎完全一致。

ps對象相對于原生pandas對象的API幾乎一致,同時還支持一些強悍的功能,例如直接以SQL形式訪問:

ps.sql("SELECT count(*) as num FROM {pdf}")

{pdf}訪問了變量名為pdf的pandas-on-Spark對象。

到此這篇關(guān)于PySpark與GraphFrames的安裝與使用的文章就介紹到這了,更多相關(guān)PySpark與GraphFrames使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • python?turtle繪制多邊形和跳躍和改變速度特效

    python?turtle繪制多邊形和跳躍和改變速度特效

    這篇文章主要介紹了python?turtle繪制多邊形和跳躍和改變速度特效,文章實現(xiàn)過程詳細,需要的小伙伴可以參考一下,希望對你的學習有所幫助
    2022-03-03
  • django中path函數(shù)使用詳解

    django中path函數(shù)使用詳解

    django.urls.path是Django中用于定義URL映射規(guī)則的函數(shù)之一,本文主要介紹了django中path函數(shù)使用,具有一定的參考價值,感興趣的可以了解一下
    2023-12-12
  • python numpy數(shù)組復制使用實例解析

    python numpy數(shù)組復制使用實例解析

    這篇文章主要介紹了python numpy數(shù)組復制使用實例解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-01-01
  • Anaconda環(huán)境克隆、遷移的詳細步驟

    Anaconda環(huán)境克隆、遷移的詳細步驟

    最近需要在多臺計算機上工作,每次重新部署環(huán)境比較麻煩,所以學習一下anaconda環(huán)境遷移的方法,下面這篇文章主要給大家介紹了關(guān)于Anaconda環(huán)境克隆、遷移的詳細步驟,需要的朋友可以參考下
    2022-08-08
  • Python 虛擬空間的使用代碼詳解

    Python 虛擬空間的使用代碼詳解

    這篇文章主要介紹了Python 虛擬空間的使用,本文通過示例代碼給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下
    2019-06-06
  • python之matplotlib矢量圖svg?emf

    python之matplotlib矢量圖svg?emf

    這篇文章主要介紹了python之matplotlib矢量圖svg?emf,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • 解析Python的縮進規(guī)則的使用

    解析Python的縮進規(guī)則的使用

    這篇文章主要介紹了解析Python的縮進規(guī)則的使用,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-01-01
  • python讀取mnist數(shù)據(jù)集方法案例詳解

    python讀取mnist數(shù)據(jù)集方法案例詳解

    這篇文章主要介紹了python讀取mnist數(shù)據(jù)集方法案例詳解,本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細內(nèi)容,需要的朋友可以參考下
    2021-09-09
  • 詳解PyCharm配置Anaconda的艱難心路歷程

    詳解PyCharm配置Anaconda的艱難心路歷程

    這篇文章主要介紹了詳解PyCharm配置Anaconda的艱難心路歷程,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-08-08
  • python使用PIL把透明背景圖片轉(zhuǎn)成白色背景的示例代碼

    python使用PIL把透明背景圖片轉(zhuǎn)成白色背景的示例代碼

    當我們在采集一些圖片的時候,這些圖片的背景經(jīng)常是透明的,但是如何把透明背景轉(zhuǎn)成白色背景呢,接下來就給大家解決這個問題,本文主要介紹了python使用PIL把透明背景圖片轉(zhuǎn)成白色背景,需要的朋友可以參考下
    2023-08-08

最新評論