欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

PySpark與GraphFrames的安裝與使用環(huán)境搭建過程

 更新時(shí)間:2022年02月21日 09:56:17   作者:小小明-代碼實(shí)體  
這篇文章主要介紹了PySpark與GraphFrames的安裝與使用教程,本文通過圖文并茂實(shí)例代碼相結(jié)合給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

PySpark環(huán)境搭建

配置hadoop

spark訪問本地文件并執(zhí)行運(yùn)算時(shí),可能會(huì)遇到權(quán)限問題或是dll錯(cuò)誤。這是因?yàn)閟park需要使用到Hadoop的winutils和hadoop.dll,首先我們必須配置好Hadoop相關(guān)的環(huán)境??梢缘絞ithub下載:https://github.com/4ttty/winutils

gitcode提供了鏡像加速:https://gitcode.net/mirrors/4ttty/winutils

我選擇了使用這個(gè)倉庫提供的最高的Hadoop版本3.0.0將其解壓到D:\deploy\hadoop-3.0.0目錄下,然后配置環(huán)境變量:

image-20220217132732434

我們還需要將對應(yīng)的hadoop.dll復(fù)制到系統(tǒng)中,用命令表達(dá)就是:

copy D:\deploy\hadoop-3.0.0\bin\hadoop.dll C:\Windows\System32

不過這步需要擁有管理員權(quán)限才可以操作。

為了能夠在任何地方使用winutils命令工具,將%HADOOP_HOME%\bin目錄加入環(huán)境變量中:

image-20220217133520475

安裝pyspark與Java

首先,我們安裝spark當(dāng)前(2022-2-17)的最新版本:

pip install pyspark==3.2.1

需要注意pyspark的版本決定了jdk的最高版本,例如假如安裝2.4.5版本的pyspark就只能安裝1.8版本的jdk,否則會(huì)報(bào)出java.lang.IllegalArgumentException: Unsupported class file major version 55的錯(cuò)誤。

這是因?yàn)閜yspark內(nèi)置了Scala,而Scala是基于jvm的編程語言,Scala與jdk的版本存在兼容性問題,JDK與scala的版本兼容性表:

JDK versionMinimum Scala versionsRecommended Scala versions
172.13.6, 2.12.15 (forthcoming)2.13.6, 2.12.15 (forthcoming)
162.13.5, 2.12.142.13.6, 2.12.14
13, 14, 152.13.2, 2.12.112.13.6, 2.12.14
122.13.1, 2.12.92.13.6, 2.12.14
112.13.0, 2.12.4, 2.11.122.13.6, 2.12.14, 2.11.12
82.13.0, 2.12.0, 2.11.0, 2.10.22.13.6, 2.12.14, 2.11.12, 2.10.7
6, 72.11.0, 2.10.02.11.12, 2.10.7

當(dāng)前3.2.1版本的pyspark內(nèi)置的Scala版本為2.12.15,意味著jdk17與其以下的所有版本都支持。

這里我依然選擇安裝jdk8的版本:

image-20220217143447453

測試一下:

>java -version
java version "1.8.0_201"
Java(TM) SE Runtime Environment (build 1.8.0_201-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)

jdk11的詳細(xì)安裝教程(jdk1.8在官網(wǎng)只有安裝包,無zip綠化壓縮包):

綠化版Java11的環(huán)境配置與Python調(diào)用Java
https://xxmdmst.blog.csdn.net/article/details/118366166

graphframes安裝

pip安裝當(dāng)前最新的graphframes:

pip install graphframes==0.6

然后在官網(wǎng)下載graphframes的jar包。

下載地址:https://spark-packages.org/package/graphframes/graphframes

由于安裝的pyspark版本是3.2,所以這里我選擇了這個(gè)jar包:

image-20220217144829403

然后將該jar包放入pyspark安裝目錄的jars目錄下:

image-20220217145105414

pyspark安裝位置可以通過pip查看:

C:\Users\ASUS>pip show pyspark
Name: pyspark
Version: 3.2.1
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: d:\miniconda3\lib\site-packages
Requires: py4j
Required-by:

使用方法

學(xué)習(xí)pyspark的最佳路徑是官網(wǎng):https://spark.apache.org/docs/latest/quick-start.html

在下面的網(wǎng)頁,官方提供了在線jupyter:

https://spark.apache.org/docs/latest/api/python/getting_started/index.html

image-20220218174237283

啟動(dòng)spark并讀取數(shù)據(jù)

本地模式啟動(dòng)spark:

from pyspark.sql import SparkSession, Row

spark = SparkSession \
    .builder \
    .appName("Python Spark") \
    .master("local[*]") \
    .getOrCreate()
sc = spark.sparkContext
spark

image-20220217153008268

SparkSession輸出的內(nèi)容中包含了spark的web頁面,新標(biāo)簽頁打開頁面后大致效果如上。

點(diǎn)擊Environment選項(xiàng)卡可以查看當(dāng)前環(huán)境中的變量:

image-20220217153531616

啟動(dòng)hive支持

找到pyspark的安裝位置,例如我的電腦在D:\Miniconda3\Lib\site-packages\pyspark

手動(dòng)創(chuàng)建conf目錄并將hive-site.xml配置文件復(fù)制到其中。如果hive使用了MySQL作為原數(shù)據(jù)庫,則還需要將MySQL對應(yīng)的驅(qū)動(dòng)jar包放入spark的jars目錄下。

創(chuàng)建spark會(huì)話對象時(shí)可通過enableHiveSupport()開啟hive支持:

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .enableHiveSupport() \
    .getOrCreate()
sc = spark.sparkContext
spark

spark訪問hive自己創(chuàng)建的表有可能會(huì)出現(xiàn)如下的權(quán)限報(bào)錯(cuò):

Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS s
hould be writable. Current permissions are: rwx------

是因?yàn)楫?dāng)前用戶不具備對\tmp\hive的操作權(quán)限:

>winutils ls \tmp\hive
drwx------ 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May  4 2020 \tmp\hive

把\tmp\hive目錄的權(quán)限改為777即可順利訪問:

>winutils chmod 777 \tmp\hive

>winutils ls \tmp\hive
drwxrwxrwx 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May  4 2020 \tmp\hive

Spark的DataFrame與RDD

從spark2.x開始將RDD和DataFrame的API統(tǒng)一抽象成dataset,DataFrame就是Dataset[Row],RDD則是Dataset.rdd。可以將DataFrame理解為包含結(jié)構(gòu)化信息的RDD。

將含row的RDD轉(zhuǎn)換為DataFrame只需要調(diào)用toDF方法或SparkSession的createDataFrame方法即可,也可以傳入schema覆蓋類型或名稱設(shè)置。

DataFrame的基礎(chǔ)api

DataFrame默認(rèn)支持DSL風(fēng)格語法,例如:

//查看DataFrame中的內(nèi)容
df.show()

//查看DataFrame部分列中的內(nèi)容
df.select(df['name'], df['age'] + 1).show()
df.select("name").show()
//打印DataFrame的Schema信息
df.printSchema()
//過濾age大于等于 21 的
df.filter(df['age'] > 21).show()
//按年齡進(jìn)行分組并統(tǒng)計(jì)相同年齡的人數(shù)
personDF.groupBy("age").count().show()

將DataFrame注冊成表或視圖之后即可進(jìn)行純SQL操作:

df.createOrReplaceTempView("people")
//df.createTempView("t_person")

//查詢年齡最大的前兩名
spark.sql("select * from t_person order by age desc limit 2").show()
//顯示表的Schema信息
spark.sql("desc t_person").show()

Pyspark可以直接很方便的注冊u(píng)df并直接使用:

strlen = spark.udf.register("len", lambda x: len(x))
print(spark.sql("SELECT len('test') length").collect())
print(spark.sql("SELECT 'foo' AS text").select(strlen("text").alias('length')).collect())

執(zhí)行結(jié)果:

[Row(length='4')]
[Row(length='3')]

RDD的簡介

DataFrame的本質(zhì)是對RDD的包裝,可以理解為DataFrame=RDD[Row]+schema。

RDD(A Resilient Distributed Dataset)叫做彈性可伸縮分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象。它代表一個(gè)不可變、自動(dòng)容錯(cuò)、可伸縮性、可分區(qū)、里面的元素可并行計(jì)算的集合。

在每一個(gè)RDD內(nèi)部具有五大屬性:

  • 具有一系列的分區(qū)
  • 一個(gè)計(jì)算函數(shù)操作于每一個(gè)切片
  • 具有一個(gè)對其他RDD的依賴列表
  • 對于 key-value RDDs具有一個(gè)Partitioner分區(qū)器
  • 存儲(chǔ)每一個(gè)切片最佳計(jì)算位置

一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對于RDD來說,每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理,并決定并行計(jì)算的粒度。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù),如果沒有指定,那么就會(huì)采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。

**一個(gè)計(jì)算每個(gè)分區(qū)的函數(shù)。**Spark中RDD的計(jì)算是以分片為單位的,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的。compute函數(shù)會(huì)對迭代器進(jìn)行復(fù)合,不需要保存每次計(jì)算的結(jié)果。

**RDD之間的依賴關(guān)系。**RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對RDD的所有分區(qū)進(jìn)行重新計(jì)算。

**一個(gè)Partitioner,即RDD的分片函數(shù)。**當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù),一個(gè)是基于哈希的HashPartitioner,另外一個(gè)是基于范圍的RangePartitioner。只有對于于key-value的RDD,才會(huì)有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。

**一個(gè)列表,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)。**對于一個(gè)HDFS文件來說,這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置。

RDD的API概覽

RDD包含Transformation APIAction API,Transformation API都是延遲加載的只是記住這些應(yīng)用到基礎(chǔ)數(shù)據(jù)集上的轉(zhuǎn)換動(dòng)作,只有當(dāng)執(zhí)行Action API時(shí)這些轉(zhuǎn)換才會(huì)真正運(yùn)行。

Transformation API產(chǎn)生的兩類RDD最重要,分別是MapPartitionsRDDShuffledRDD

產(chǎn)生MapPartitionsRDD的算子有map、keyBy、keys、values、flatMap、mapValues 、flatMapValues、mapPartitions、mapPartitionsWithIndex、glom、filter和filterByRange 。其中用的最多的是mapflatMap,但任何產(chǎn)生MapPartitionsRDD的算子都可以直接使用mapPartitionsmapPartitionsWithIndex實(shí)現(xiàn)。

產(chǎn)生ShuffledRDD的算子有combineByKeyWithClassTag、combineByKey、aggregateByKey、foldByKey 、reduceByKey 、distinct、groupByKey、groupBy、partitionBy、sortByKey 和 repartitionAndSortWithinPartitions。

combineByKey到groupByKey 底層均是調(diào)用combineByKeyWithClassTag方法:

@Experimental
def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner,mergeValue,mergeCombiners
       ,defaultPartitioner(self))
}
def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
  val createCombiner = (v: V) => CompactBuffer(v)
  val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
  val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
  val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
    createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
  bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}

三個(gè)重要參數(shù)的含義:

  • createCombiner:根據(jù)每個(gè)分區(qū)的第一個(gè)元素操作產(chǎn)生一個(gè)初始值
  • mergeValue:對每個(gè)分區(qū)內(nèi)部的元素進(jìn)行迭代合并
  • mergeCombiners:對所有分區(qū)的合并結(jié)果進(jìn)行合并

groupByKey的partitioner未指定時(shí)會(huì)傳入默認(rèn)的defaultPartitioner。例如:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2).keyBy(_.length)
a.groupByKey.collect

res9: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(lion)), (6,CompactBuffer(spider)), (3,CompactBuffer(dog, cat)), (5,CompactBuffer(tiger, eagle)))

aggregateByKey:每個(gè)分區(qū)使用zeroValue作為初始值,迭代每一個(gè)元素用seqOp進(jìn)行合并,對所有分區(qū)的結(jié)果用combOp進(jìn)行合并。例如:

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
res7: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))

reduceByKey :每個(gè)分區(qū)迭代每一個(gè)元素用func進(jìn)行合并,對所有分區(qū)的結(jié)果用func再進(jìn)行合并,例如:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect

Action API有:

動(dòng)作含義
reduce(func)通過func函數(shù)聚集RDD中的所有元素,這個(gè)功能必須是課交換且可并聯(lián)的
collect()在驅(qū)動(dòng)程序中,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素
count()返回RDD的元素個(gè)數(shù)
first()返回RDD的第一個(gè)元素(類似于take(1))
take(n)返回一個(gè)由數(shù)據(jù)集的前n個(gè)元素組成的數(shù)組
takeSample(withReplacement*,*num, [seed])返回一個(gè)數(shù)組,該數(shù)組由從數(shù)據(jù)集中隨機(jī)采樣的num個(gè)元素組成,可以選擇是否用隨機(jī)數(shù)替換不足的部分,seed用于指定隨機(jī)數(shù)生成器種子
takeOrdered(n, [ordering])排序并取前N個(gè)元素
saveAsTextFile(path)將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng),對于每個(gè)元素,Spark將會(huì)調(diào)用toString方法,將它裝換為文件中的文本
saveAsSequenceFile(path)將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)。
saveAsObjectFile(path)將RDD中的元素用NullWritable作為key,實(shí)際元素作為value保存為sequencefile格式
countByKey()針對(K,V)類型的RDD,返回一個(gè)(K,Int)的map,表示每一個(gè)key對應(yīng)的元素個(gè)數(shù)。
foreach(func)在數(shù)據(jù)集的每一個(gè)元素上,運(yùn)行函數(shù)func進(jìn)行更新。

spark模擬實(shí)現(xiàn)mapreduce版wordcount:

object MapreduceWordcount {
  def main(args: Array[String]): Unit = {
    import org.apache.spark._
    val sc: SparkContext = new SparkContext(new SparkConf().setAppName("wordcount").setMaster("local[*]"))
    sc.setLogLevel("WARN")
    
    import org.apache.hadoop.io.{LongWritable, Text}
    import org.apache.hadoop.mapred.TextInputFormat
    import org.apache.spark.rdd.HadoopRDD

    import scala.collection.mutable.ArrayBuffer
    
    def map(k: LongWritable, v: Text, collect: ArrayBuffer[(String, Int)]) = {
      for (word <- v.toString.split("\\s+"))
        collect += ((word, 1))
    }
    def reduce(key: String, value: Iterator[Int], collect: ArrayBuffer[(String, Int)]) = {
      collect += ((key, value.sum))
    }
    val rdd = sc.hadoopFile("/hdfs/wordcount/in1/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 2)
      .asInstanceOf[HadoopRDD[LongWritable, Text]]
      .mapPartitionsWithInputSplit((split, it) =>{
        val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
        it.foreach(kv => map(kv._1, kv._2, collect))
        collect.toIterator
      })
      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
      .mapPartitions(it => {
        val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
        var lastKey: String = ""
        var values: ArrayBuffer[Int] = ArrayBuffer[Int]()
        for ((currKey, value) <- it) {
          if (!currKey.equals(lastKey)) {
            if (values.length != 0)
              reduce(lastKey, values.toIterator, collect)
            values.clear()
          }
          values += value
          lastKey = currKey
        }
        if (values.length != 0) reduce(lastKey, values.toIterator, collect)
        collect.toIterator
      })
    rdd.foreach(println)
  }
}

各類RDD

image-20220218181635289

  • ShuffledRDD :表示需要走Shuffle過程的網(wǎng)絡(luò)傳輸
  • CoalescedRDD :用于將一臺(tái)機(jī)器的多個(gè)分區(qū)合并成一個(gè)分區(qū)
  • CartesianRDD :對兩個(gè)RDD的所有元素產(chǎn)生笛卡爾積
  • MapPartitionsRDD :用于對每個(gè)分區(qū)的數(shù)據(jù)進(jìn)行特定的處理
  • CoGroupedRDD :用于將2~4個(gè)rdd,按照key進(jìn)行連接聚合
  • SubtractedRDD :用于對2個(gè)RDD求差集
  • UnionRDDPartitionerAwareUnionRDD :用于對2個(gè)RDD求并集
  • ZippedPartitionsRDD2:zip拉鏈操作產(chǎn)生的RDD
  • ZippedWithIndexRDD:給每一個(gè)元素標(biāo)記一個(gè)自增編號(hào)
  • PartitionwiseSampledRDD:用于對rdd的元素按照指定的百分比進(jìn)行隨機(jī)采樣

當(dāng)我們需要給Datafream添加自增列時(shí),可以使用zipWithUniqueId方法:

from pyspark.sql.types import StructType, LongType

schema = data.schema.add(StructField("id", LongType()))
rowRDD = data.rdd.zipWithUniqueId().map(lambda t: t[0]+Row(t[1]))
data = rowRDD.toDF(schema)
data.show()

API用法詳情可參考:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD

cache&checkpoint

RDD通過persist方法或cache方法可以將前面的計(jì)算結(jié)果緩存,但是并不是這兩個(gè)方法被調(diào)用時(shí)立即緩存,而是觸發(fā)后面的action時(shí),該RDD將會(huì)被緩存在計(jì)算節(jié)點(diǎn)的內(nèi)存中,并供后面重用。

rdd.persist()

checkpoint的源碼注釋可以看到:

  • 標(biāo)記該RDD作為檢查點(diǎn)。
  • 它將被保存在通過SparkContext#setCheckpointDir方法設(shè)置的檢查點(diǎn)目錄中
  • 它所引用的所有父RDD引用將全部被移除
  • 這個(gè)方法在這個(gè)RDD上必須在所有job執(zhí)行前運(yùn)行。
  • 強(qiáng)烈建議將這個(gè)RDD緩存在內(nèi)存中,否則這個(gè)保存文件的計(jì)算任務(wù)將重新計(jì)算。

從中我們得知,在執(zhí)行checkpoint方法時(shí),最好同時(shí),將該RDD緩存起來,否則,checkpoint也會(huì)產(chǎn)生一個(gè)計(jì)算任務(wù)。

sc.setCheckpointDir("checkpoint")
rdd.cache()
rdd.checkpoint()

graphframes 的用法

GraphFrame是將Spark中的Graph算法統(tǒng)一到DataFrame接口的Graph操作接口,為Scala、Java和Python提供了統(tǒng)一的圖處理API。

Graphframes是開源項(xiàng)目,源碼工程如下:https://github.com/graphframes/graphframes

可以參考:

  • 官網(wǎng):https://graphframes.github.io/graphframes/docs/_site/index.html
  • GraphFrames用戶指南-Python — Databricks文檔:https://docs.databricks.com/spark/latest/graph-analysis/graphframes/user-guide-python.html

在GraphFrames中圖的頂點(diǎn)(Vertex)和邊(edge)都是以DataFrame形式存儲(chǔ)的:

  • 頂點(diǎn)DataFrame:必須包含列名為“id”的列,用于作為頂點(diǎn)的唯一標(biāo)識(shí)
  • 邊DataFrame:必須包含列名為“src”和“dst”的列,根據(jù)唯一標(biāo)識(shí)id標(biāo)識(shí)關(guān)系

創(chuàng)建圖的示例:

from graphframes import GraphFrame
vertices = spark.createDataFrame([
    ("a", "Alice", 34),
    ("b", "Bob", 36),
    ("c", "Charlie", 30),
    ("d", "David", 29),
    ("e", "Esther", 32),
    ("f", "Fanny", 36),
    ("g", "Gabby", 60)], ["id", "name", "age"])
edges = spark.createDataFrame([
    ("a", "b", "friend"),
    ("b", "c", "follow"),
    ("c", "b", "follow"),
    ("f", "c", "follow"),
    ("e", "f", "follow"),
    ("e", "d", "friend"),
    ("d", "a", "friend"),
    ("a", "e", "friend")
], ["src", "dst", "relationship"])
# 生成圖
g = GraphFrame(vertices, edges)

GraphFrame提供三種視圖:

print("頂點(diǎn)表視圖:")
graph.vertices.show() # graph.vertices 就是原始的vertices
print("邊表視圖:")
graph.edges.show() # graph.edges 就是原始的 edges
print("三元組視圖:")
graph.triplets.show()

獲取頂點(diǎn)的度、入度和出度:

# 頂點(diǎn)的度
graph.degrees.show()
# 頂點(diǎn)的入度
graph.inDegrees.show()
# 頂點(diǎn)的出度
graph.outDegrees.show()

Motif finding (模式發(fā)現(xiàn))

示例:

# 多個(gè)路徑條件
motif = graph.find("(a)-[e]->(b); (b)-[e2]->(a)")
# 在搜索的結(jié)果上進(jìn)行過濾
motif.filter("b.age > 30")
# 不需要返回路徑中的元素時(shí),可以使用匿名頂點(diǎn)和邊
motif = graph.find("(start)-[]->()")
# 設(shè)置路徑不存在的條件
motif = graph.find("(a)-[]->(b); !(b)-[]->(a)")

假設(shè)我們要想給用戶推薦關(guān)注的人,可以找出這樣的關(guān)系:A關(guān)注B,B關(guān)注C,但是A未關(guān)注C。找出這樣的關(guān)系就可以把C推薦給A:

# Motif: A->B->C but not A->C
results = graph.find("(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)")
# 排除自己
results = results.filter("A.id != C.id")
# 選擇需要的列
results = results.select(results.A.id.alias("A"), results.C.id.alias("C"))
results.show()

結(jié)果:

+---+---+
|  A|  C|
+---+---+
|  e|  c|
|  e|  a|
|  d|  b|
|  a|  d|
|  f|  b|
|  d|  e|
|  a|  f|
|  a|  c|
+---+---+

Motif在查找路徑過程的過程中,還可以沿著路徑攜帶狀態(tài)。例如我們想要找出關(guān)系鏈有4個(gè)頂點(diǎn),而且其中3條邊全部都是"friend"關(guān)系:

from pyspark.sql.functions import col, lit, when
from functools import reduce
chain4 = graph.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
def sumFriends(cnt, relationship):
    "定義下一個(gè)頂點(diǎn)更新狀態(tài)的條件:如果關(guān)系為friend則cnt+1"
    return when(relationship == "friend", cnt+1).otherwise(cnt)
# 將更新方法應(yīng)用到整個(gè)鏈的,鏈上每有一個(gè)關(guān)系是 friend 就加一,鏈上共三個(gè)關(guān)系。
condition = reduce(lambda cnt, e: sumFriends(
    cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0))
chainWith2Friends2 = chain4.where(condition >= 3)
chainWith2Friends2.show()

結(jié)果:

+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
|              a|            ab|              b|            bc|              c|            cd|              d|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, b, friend}|   {b, Bob, 36}|
| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+

Subgraphs 子圖

可以直接過濾其頂點(diǎn)或邊,dropIsolatedVertices()方法用于刪除孤立沒有連接的點(diǎn):

graph.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()

還可以基于模式發(fā)現(xiàn)獲取到的邊創(chuàng)建Subgraphs :

paths = graph.find("(a)-[e]->(b)")\
    .filter("e.relationship = 'follow'")\
    .filter("a.age < b.age")
# 抽取邊信息e2 = paths.select("e.src", "e.dst", "e.relationship")
e2 = paths.select("e.*")
# 創(chuàng)建Subgraphs
g2 = GraphFrame(graph.vertices, e2)

GraphFrames支持的GraphX算法

  • PageRank:查找圖中的重要頂點(diǎn)。
  • 廣度優(yōu)先搜索(BFS):查找從一組頂點(diǎn)到另一組頂點(diǎn)的最短路徑
  • 連通組件(ConnectedComponents):為具備連接關(guān)系的頂點(diǎn)分配相同的組件ID
  • 強(qiáng)連通組件(StronglyConnectedConponents):根據(jù)每個(gè)頂點(diǎn)的強(qiáng)連通分量分配SCC。
  • 最短路徑(Shortest paths):查找從每個(gè)頂點(diǎn)到目標(biāo)頂點(diǎn)集的最短路徑。
  • 三角形計(jì)數(shù)(TriangleCount):計(jì)算每個(gè)頂點(diǎn)所屬的三角形的數(shù)量,經(jīng)常用于確定組的穩(wěn)定性(相互連接的數(shù)量代表了穩(wěn)定性)或作為其他網(wǎng)絡(luò)度量(如聚類系數(shù))的一部分,在社交網(wǎng)絡(luò)分析中用來檢測社區(qū)。
  • 標(biāo)簽傳播算法(LPA):檢測圖中的社區(qū)。

pageRank算法:

results = graph.pageRank(resetProbability=0.15, maxIter=10)
results.vertices.sort("pagerank", ascending=False).show()

結(jié)果:

+---+-------+---+-------------------+
| id|   name|age|           pagerank|
+---+-------+---+-------------------+
|  b|    Bob| 36| 2.7025217677349773|
|  c|Charlie| 30| 2.6667877057849627|
|  a|  Alice| 34| 0.4485115093698443|
|  e| Esther| 32| 0.3613490987992571|
|  f|  Fanny| 36|0.32504910549694244|
|  d|  David| 29|0.32504910549694244|
|  g|  Gabby| 60|0.17073170731707318|
+---+-------+---+-------------------+

可以設(shè)置起始頂點(diǎn):

graph.pageRank(resetProbability=0.15, maxIter=10, sourceId="a")
graph.parallelPersonalizedPageRank(resetProbability=0.15, sourceIds=["a", "b", "c", "d"], maxIter=10)

廣度優(yōu)先搜索BFS:

搜索從姓名叫Esther到年齡小于32的最小路徑:

paths = graph.bfs("name = 'Esther'", "age < 32")
paths.show()
+--------------+--------------+---------------+
|          from|            e0|             to|
+--------------+--------------+---------------+
|{a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+--------------+--------------+---------------+

可以指定只能在指定的邊搜索:

graph.bfs("name = 'Esther'",
          "age < 32",
          edgeFilter="relationship != 'friend'",
          maxPathLength=4
          ).show()
+---------------+--------------+--------------+--------------+----------------+
|           from|            e0|            v1|            e1|              to|
+---------------+--------------+--------------+--------------+----------------+
|{e, Esther, 32}|{e, f, follow}|{f, Fanny, 36}|{f, c, follow}|{c, Charlie, 30}|
+---------------+--------------+--------------+--------------+----------------+

Connected components 連通組件:

必須先設(shè)置檢查點(diǎn):

sc.setCheckpointDir("checkpoint")

graph.connectedComponents().show()

結(jié)果:

+---+-------+---+------------+
| id|   name|age|   component|
+---+-------+---+------------+
|  a|  Alice| 34|412316860416|
|  b|    Bob| 36|412316860416|
|  c|Charlie| 30|412316860416|
|  d|  David| 29|412316860416|
|  e| Esther| 32|412316860416|
|  f|  Fanny| 36|412316860416|
|  g|  Gabby| 60|146028888064|
+---+-------+---+------------+

可以看到僅g點(diǎn)在一個(gè)連通區(qū)域內(nèi),可以調(diào)用dropIsolatedVertices()方法,刪除這種孤立的沒有連接的點(diǎn):

graph.dropIsolatedVertices().connectedComponents().show()

結(jié)果:

+---+-------+---+------------+
| id|   name|age|   component|
+---+-------+---+------------+
|  a|  Alice| 34|412316860416|
|  b|    Bob| 36|412316860416|
|  c|Charlie| 30|412316860416|
|  d|  David| 29|412316860416|
|  e| Esther| 32|412316860416|
|  f|  Fanny| 36|412316860416|
+---+-------+---+------------+

Strongly connected components 強(qiáng)連通組件:

graph.stronglyConnectedComponents(maxIter=10).show()

Shortest paths 最短路徑:

每個(gè)頂點(diǎn)到a或d的最短路徑:

graph.shortestPaths(landmarks=["a", "d"]).show()
+---+-------+---+----------------+
| id|   name|age|       distances|
+---+-------+---+----------------+
|  g|  Gabby| 60|              {}|
|  f|  Fanny| 36|              {}|
|  e| Esther| 32|{a -> 2, d -> 1}|
|  d|  David| 29|{a -> 1, d -> 0}|
|  c|Charlie| 30|              {}|
|  b|    Bob| 36|              {}|
|  a|  Alice| 34|{a -> 0, d -> 2}|
+---+-------+---+----------------+

Triangle count 三角形計(jì)數(shù):

graph.triangleCount().show()
+-----+---+-------+---+
|count| id|   name|age|
+-----+---+-------+---+
|    1|  a|  Alice| 34|
|    0|  b|    Bob| 36|
|    0|  c|Charlie| 30|
|    1|  d|  David| 29|
|    1|  e| Esther| 32|
|    0|  g|  Gabby| 60|
|    0|  f|  Fanny| 36|
+-----+---+-------+---+

說明頂點(diǎn)a/e/d構(gòu)成三角形。

標(biāo)簽傳播算法(LPA):

graph.labelPropagation(maxIter=5).orderBy("label").show()
+---+-------+---+-------------+
| id|   name|age|        label|
+---+-------+---+-------------+
|  g|  Gabby| 60| 146028888064|
|  f|  Fanny| 36|1047972020224|
|  b|    Bob| 36|1047972020224|
|  a|  Alice| 34|1382979469312|
|  c|Charlie| 30|1382979469312|
|  e| Esther| 32|1460288880640|
|  d|  David| 29|1460288880640|
+---+-------+---+-------------+

PySpark3.X與pandas融合

Pyspark從3.0版本開始出現(xiàn)了pandas_udf裝飾器、applyInPandas和mapInPandas,基于這些方法,我們就可以使用熟悉的pandas的語法處理spark對象的數(shù)據(jù)。

首先創(chuàng)建幾條測試數(shù)據(jù),并啟動(dòng) Apache Arrow

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))
df.show()

自定義UDF和UDAF

pyspark暫不支持自定義UDTF。

使用pandas_udf裝飾器我們可以創(chuàng)建出基于pandas的udf自定義函數(shù),在DSL的語法中可以被直接使用:

from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf("double")
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b
df.select(multiply_func("id", "v").alias("product")).show()

注冊函數(shù)和視圖后,可以直接在SQL中使用:

df.createOrReplaceTempView("t")
spark.udf.register("multiply", multiply_func)
spark.sql('select multiply(id, v) product from t').show()

結(jié)果均為:

+-------+
|product|
+-------+
|    1.0|
|    2.0|
|    6.0|
|   10.0|
|   20.0|
+-------+

還支持聚合函數(shù)和窗口函數(shù):

from pyspark.sql import Window

@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()
# 對字段'v'進(jìn)行求均值
df.select(mean_udf('v').alias("mean_v")).show()
# 按照‘id'分組,求'v'的均值
df.groupby("id").agg(mean_udf('v').alias("mean_v")).show()
# 按照‘id'分組,求'v'的均值,并賦值給新的一列
df.withColumn('mean_v', mean_udf("v").over(Window.partitionBy('id'))).show()

注冊到udf之后同樣可以直接使用SQL實(shí)現(xiàn):

spark.udf.register("mean2", mean_udf)
spark.sql('select mean2(v) mean_v from t').show()
spark.sql('select id,mean2(v) mean_v from t group by id').show()
spark.sql('select id,v,mean2(v) over(partition by id) mean_v from t').show()

結(jié)果均為:

+--------+
| mean_v |
+--------+
|     4.2|
+--------+

+---+--------+
| id| mean_v |
+---+--------+
|  1|     1.5|
|  2|     6.0|
+---+--------+

+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.5|
|  1| 2.0|   1.5|
|  2| 3.0|   6.0|
|  2| 5.0|   6.0|
|  2|10.0|   6.0|
+---+----+------+

分組聚合與JOIN

applyInPandas需要在datafream調(diào)用groupby之后才能使用:

def subtract_mean(pdf):
    v = pdf.v
    pdf['v1'] = v - v.mean()
    pdf['v2'] = v + v.mean()
    return pdf
t = df.groupby("id")
t.applyInPandas(
    subtract_mean, schema="id long, v double, v1 double, v2 double").show()

結(jié)果:

+---+----+----+----+
| id|   v|  v1|  v2|
+---+----+----+----+
|  1| 1.0|-0.5| 2.5|
|  1| 2.0| 0.5| 3.5|
|  2| 3.0|-3.0| 9.0|
|  2| 5.0|-1.0|11.0|
|  2|10.0| 4.0|16.0|
+---+----+----+----+

subtract_mean函數(shù)接收的是對應(yīng)id的dataframe數(shù)據(jù),schema指定了返回值的名稱和類型列表。

通過以下代碼我們可以知道,applyInPandas可以借助cogroup進(jìn)行表連接:

val a = sc.parallelize(List(1, 2, 1, 3))
val b = a.map((_, "b"))
val c = a.map((_, "c"))
val d = a.map((_, "d"))
val e = a.map((_, "e"))
scala> b.cogroup(c).foreach(println)
(3,(CompactBuffer(b),CompactBuffer(c)))
(1,(CompactBuffer(b, b),CompactBuffer(c, c)))
(2,(CompactBuffer(b),CompactBuffer(c)))

示例:

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
  	# l、r is a pandas.DataFrame
  	# 這里是按照id分組
  	# 那么,l和r分別是對應(yīng)id的df1和df2數(shù)據(jù)
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

map迭代

執(zhí)行以下代碼:

def filter_func(iterator):
    for i, pdf in enumerate(iterator):
        print(i, pdf.values.tolist())
        yield pdf


df.mapInPandas(filter_func, schema=df.schema).show()

后臺(tái)看到執(zhí)行結(jié)果為:

0 [[2.0, 5.0]]                                                     
0 [[2.0, 3.0]]
0 [[1.0, 1.0]]
0 [[1.0, 2.0]]
0 [[2.0, 10.0]]

前臺(tái)結(jié)果幾乎保持原樣??梢灾纈terator是一個(gè)分區(qū)迭代器,迭代出當(dāng)前分區(qū)的每一行數(shù)據(jù)都被封裝成一個(gè)pandas對象。

Pyspark與Pandas的交互

將spark的Datafream對象轉(zhuǎn)換為原生的pandas對象只需調(diào)用toPandas()方法即可:

sdf.toPandas()

將原生的pandas對象轉(zhuǎn)換為spark對象可以使用spark的頂級(jí)方法:

spark.createDataFrame(pdf)

習(xí)慣使用pandas的童鞋,還可以直接使用pandas-on-Spark,在spark3.2.0版本時(shí)已經(jīng)匹配到pandas 1.3版本的API。通過pandas-on-Spark,我們可以完全用pandas的api操作數(shù)據(jù),而底層執(zhí)行卻是spark的并行化。

使用pandas-on-Spark最好設(shè)置一下環(huán)境變量:

import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

將spark對象轉(zhuǎn)換為pandas-on-Spark對象:

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

pdf = df.to_pandas_on_spark()
print(type(pdf))
pdf

image-20220218090829123

pandas-on-Spark對象也可以還原成spark對象:

pdf.to_spark()

另外spark提供直接將文件讀取成pandas-on-Spark對象的api,例如:

import pyspark.pandas as ps

pdf = ps.read_csv("example_csv.csv")

ps對象與原生pandas對象的API幾乎完全一致。

ps對象相對于原生pandas對象的API幾乎一致,同時(shí)還支持一些強(qiáng)悍的功能,例如直接以SQL形式訪問:

ps.sql("SELECT count(*) as num FROM {pdf}")

{pdf}訪問了變量名為pdf的pandas-on-Spark對象。

到此這篇關(guān)于PySpark與GraphFrames的安裝與使用的文章就介紹到這了,更多相關(guān)PySpark與GraphFrames使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • python?turtle繪制多邊形和跳躍和改變速度特效

    python?turtle繪制多邊形和跳躍和改變速度特效

    這篇文章主要介紹了python?turtle繪制多邊形和跳躍和改變速度特效,文章實(shí)現(xiàn)過程詳細(xì),需要的小伙伴可以參考一下,希望對你的學(xué)習(xí)有所幫助
    2022-03-03
  • django中path函數(shù)使用詳解

    django中path函數(shù)使用詳解

    django.urls.path是Django中用于定義URL映射規(guī)則的函數(shù)之一,本文主要介紹了django中path函數(shù)使用,具有一定的參考價(jià)值,感興趣的可以了解一下
    2023-12-12
  • python numpy數(shù)組復(fù)制使用實(shí)例解析

    python numpy數(shù)組復(fù)制使用實(shí)例解析

    這篇文章主要介紹了python numpy數(shù)組復(fù)制使用實(shí)例解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-01-01
  • Anaconda環(huán)境克隆、遷移的詳細(xì)步驟

    Anaconda環(huán)境克隆、遷移的詳細(xì)步驟

    最近需要在多臺(tái)計(jì)算機(jī)上工作,每次重新部署環(huán)境比較麻煩,所以學(xué)習(xí)一下anaconda環(huán)境遷移的方法,下面這篇文章主要給大家介紹了關(guān)于Anaconda環(huán)境克隆、遷移的詳細(xì)步驟,需要的朋友可以參考下
    2022-08-08
  • Python 虛擬空間的使用代碼詳解

    Python 虛擬空間的使用代碼詳解

    這篇文章主要介紹了Python 虛擬空間的使用,本文通過示例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2019-06-06
  • python之matplotlib矢量圖svg?emf

    python之matplotlib矢量圖svg?emf

    這篇文章主要介紹了python之matplotlib矢量圖svg?emf,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • 解析Python的縮進(jìn)規(guī)則的使用

    解析Python的縮進(jìn)規(guī)則的使用

    這篇文章主要介紹了解析Python的縮進(jìn)規(guī)則的使用,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2019-01-01
  • python讀取mnist數(shù)據(jù)集方法案例詳解

    python讀取mnist數(shù)據(jù)集方法案例詳解

    這篇文章主要介紹了python讀取mnist數(shù)據(jù)集方法案例詳解,本篇文章通過簡要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下
    2021-09-09
  • 詳解PyCharm配置Anaconda的艱難心路歷程

    詳解PyCharm配置Anaconda的艱難心路歷程

    這篇文章主要介紹了詳解PyCharm配置Anaconda的艱難心路歷程,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2018-08-08
  • python使用PIL把透明背景圖片轉(zhuǎn)成白色背景的示例代碼

    python使用PIL把透明背景圖片轉(zhuǎn)成白色背景的示例代碼

    當(dāng)我們在采集一些圖片的時(shí)候,這些圖片的背景經(jīng)常是透明的,但是如何把透明背景轉(zhuǎn)成白色背景呢,接下來就給大家解決這個(gè)問題,本文主要介紹了python使用PIL把透明背景圖片轉(zhuǎn)成白色背景,需要的朋友可以參考下
    2023-08-08

最新評(píng)論