Spark?GraphX?分布式圖處理框架圖算法詳解
正文
Spark GraphX是一個分布式圖處理框架,基于 Pregel 接口實現(xiàn)了常用的圖算法。
包括 PageRank、SVDPlusPlus、TriangleCount、 ConnectedComponents、LPA 等算法,以下通過具象化的圖實例理解相應(yīng)的算法用途。
Graphx圖結(jié)構(gòu)

Graphx中的Graph有兩個RDD,一個是邊RDD,一個是點RDD。
此外,三元組其實就是(點、邊,點)一個有效組合,由triplets()接口獲取,triplets()返回的結(jié)果是EdgeTriplet[VD,ED]。
1. 最短路徑
最常見的路徑搜索算法(例如DFS & BFS、最短路徑、 最小生成樹、隨機游走等),最短路徑是最容易理解的圖算法,因為大家在生活中能夠廣泛接觸到,如駕駛導(dǎo)航,外賣送餐路線等等。
路徑搜索算法建立在圖搜索算法的基礎(chǔ)上,用來探索節(jié)點之間的路徑。這些路徑從一個節(jié)點開始,遍歷關(guān)系,直到到達目的地,Graphx采用了最短路徑算法Dijkstra的原理。
示例數(shù)據(jù)
// 輸入一些邊數(shù)據(jù)
val edgeSeq: Seq[(Int, Int)] = Seq((1, 2), (1, 5), (2, 3), (2, 5), (3, 4), (4, 5), (4, 6),(6, 9),(9, 11)).flatMap(e => Seq(e, e.swap))
val edges = op.sc.parallelize(edgeSeq).map { case (v1, v2) => (v1.toLong, v2.toLong) }
可視化數(shù)據(jù)
這是上述數(shù)據(jù)的圖形表示(雙向邊,無權(quán))

計算最短路徑
val graph_sp = Graph.fromEdgeTuples(edges, 1)
val landmarks = Seq(1, 11).map(_.toLong)
val results = ShortestPaths.run(graph_sp, landmarks).vertices.collect.map {
case (v, spMap) => (v, spMap.mapValues(i => i))
}
全部結(jié)果打?。?/p>
println(results.mkString)
(1,Map(1 -> 0, 11 -> 5))
(2,Map(1 -> 1, 11 -> 5))
(3,Map(1 -> 2, 11 -> 4))
(4,Map(1 -> 2, 11 -> 3))
(5,Map(1 -> 1, 11 -> 4))
(6,Map(11 -> 2, 1 -> 3))
(9,Map(11 -> 1, 1 -> 4))
(11,Map(11 -> 0, 1 -> 5))
上述計算了圖中所有點到點1和點11的最短距離,(起點id,Map(目標(biāo)1 -> 最短路徑長度,目標(biāo)2 -> 最短路徑長度))。例如5,Map(1 -> 1, 11 -> 4)說明從5到1最短距離是1,5到11的最短距離是4。
2. 網(wǎng)頁排名
PageRank度量一個圖中每個頂點的重要程度,假定從u到v的一條邊代表v的重要性標(biāo)簽。例如,一個微博用戶被許多其它人粉,該用戶排名很高。GraphX帶有靜態(tài)和動態(tài)PageRank的實現(xiàn)方法,這些方法在PageRank object中。靜態(tài)的PageRank運行固定次數(shù)的迭代,而動態(tài)的PageRank一直運行,直到收斂。
GraphX有一個我們可以運行PageRank的社交網(wǎng)絡(luò)數(shù)據(jù)集的簡單數(shù)據(jù)。用戶集在graphx/data/users.txt中,用戶之間的關(guān)系在graphx/data/followers.txt中(Spark的源碼或編譯后文件里都包含)。
數(shù)據(jù)可視化

pagerank算法測試
先說PageRank動態(tài)實現(xiàn),以下調(diào)用就是動態(tài)的,實際是調(diào)用runUntilConvergence()不能指定迭代次數(shù)。參數(shù)0.0001是個容忍度,是在對圖進行迭代過程中退出迭代的條件,而靜態(tài)的PageRank不可傳遞該參數(shù),但可以指定迭代次數(shù)【固定次數(shù),所以靜態(tài)】。
val graph: Graph[Int, Int] = GraphLoader
.edgeListFile(op.sc, "followers.txt", canonicalOrientation = true, numEdgePartitions = 1)
val ranks = graph.pageRank(0.0001).vertices.sortBy(_._2, ascending = false)
ranks.take(5).foreach(println(_))
算法結(jié)果
(7,1.8138212152810693)
(2,1.0692956678358136)
(4,0.8759124087591241)
(6,0.8759124087591241)
(1,0.6825291496824343)
# join name
(odersky,1.8138212152810693)
(ladygaga,1.0692956678358136)
(justinbieber,0.8759124087591241)
(matei_zaharia,0.8759124087591241)
(BarackObama,0.6825291496824343)
二元組左側(cè)是頂點信息,右側(cè)是重要程度,也就是分?jǐn)?shù)越高排名越靠前。
這個結(jié)果有一些順序跟直觀感受不符,點7最重要毋庸置疑,點1的重要性應(yīng)該是大于點4的,但是結(jié)果不是這樣,那么數(shù)據(jù)集大一些會更好嗎??
personalizedPageRank()方法還可以進行個性化推薦,比如社交網(wǎng)絡(luò)中,給某用戶再推薦一個人,或者對于用戶商品的推薦中,用戶商品兩個實體可以形成一個圖,我們就可以根據(jù)具體的某個用戶來給他推薦一些商品。
3. 連通域(連通組件)
連通分量算法用其編號最小的頂點的 ID 標(biāo)記圖中的每個連通分量。例如,在社交網(wǎng)絡(luò)中,連接的組件可以近似集群。
加載圖測試連通域
這里的graph仍然是加載followers.txt數(shù)據(jù),spark自帶的有。
val cc: Graph[VertexId, Int] = graph.connectedComponents()
println("連通結(jié)果展示++++++++:")
cc.vertices.map(_.swap)
.groupByKey()
.map(_._2)
.foreach(println)
結(jié)果展示(跟圖形觀察的結(jié)果是一致):
連通結(jié)果展示++++++++: CompactBuffer(4, 1, 2) CompactBuffer(6, 3, 7)
可以看到是2個域。結(jié)果圖數(shù)據(jù)本身不是這樣組織的,為了便于理解進行了聚合。原始數(shù)據(jù)collect回來是這樣:
Array((4,1), (1,1), (6,3), (3,3), (7,3), (2,1))
元組左側(cè)是頂點,右側(cè)表示歸屬,這個結(jié)果符合預(yù)期。
生成圖測試
val g = Graph(sc.makeRDD((1L to 7L).map((_,""))),
sc.makeRDD(Array(Edge(2L,5L,""), Edge(5L,3L,""), Edge(3L,2L,""),
Edge(4L,5L,""), Edge(6L,7L,""))))
g.connectedComponents
.vertices
.map(_.swap)
.groupByKey()
.map(_._2)
.foreach(println)
圖實例的形態(tài)展示

這樣的代碼便于自行組織一套圖數(shù)據(jù),按自己意思進行修改,運行上述代碼得到結(jié)果是:
CompactBuffer(1) CompactBuffer(2, 3, 4, 5) CompactBuffer(6, 7)
強連接網(wǎng)絡(luò)就是:在這個網(wǎng)絡(luò)中無論你從哪個頂點開始,其他所有頂點都是可達的。
強連接域的計算
g.stronglyConnectedComponents(3)
.vertices.map(_.swap)
.groupByKey()
.map(_._2)
.filter(_.size > 1)
.foreach(println)
過濾掉那些單點的域,那么強連接的計算結(jié)果是CompactBuffer(2, 3, 5)。
4. 三角計數(shù)
當(dāng)一個頂點有兩個相鄰的頂點并且它們之間有一條邊時,它就是三角形的一部分。需要注意的是,在計算社交網(wǎng)絡(luò)數(shù)據(jù)集的三角形計數(shù)時,TriangleCount需要邊的方向是規(guī)范的方向(srcId < dstId),并且圖通過Graph.partitionBy分片過。
三角計數(shù)統(tǒng)計應(yīng)用場景:大規(guī)模的社區(qū)發(fā)現(xiàn),通過該算法可以做群體檢測。只要是跟大規(guī)模小團體檢測方面該算法都可以很好的支持,算法是找出擁有三角形環(huán)關(guān)系的最多的頂點。
Triangle Count的算法思想如下:
- 計算每個結(jié)點的鄰結(jié)點;
- 對通過每條邊的兩個頂點相聯(lián)的頂點的相鄰點集合計算交集,并找出交集中id大于前兩個結(jié)點id的結(jié)點;
- 對每個結(jié)點統(tǒng)計Triangle總數(shù),注意只統(tǒng)計符合計算方向的Triangle Count。
代碼測試
val graph2 = graph.partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph2.triangleCount().vertices
println(triCounts.collect().mkString("\n"))
開頭先對圖graph進行了分片得到graph2。
測試結(jié)果
這個意思是6,3,7頂點分別擁有1個三角環(huán),而其他頂點沒有,實際上正是6,3,7組成了三角。
(4,0)
(1,0)
(6,1)
(3,1)
(7,1)
(2,0)
5. 標(biāo)簽傳播算法(LPA)
Label Propagation,是一種基于圖的半監(jiān)督學(xué)習(xí)算法(Semi-supervised learning),應(yīng)用場景為:社區(qū)發(fā)現(xiàn)(Community detection)。社區(qū)發(fā)現(xiàn)的過程就是一種聚類的過程。主要是用于團體檢測,LPA能夠以接近線性復(fù)雜度去檢測一個大規(guī)模圖中的團體結(jié)構(gòu),主要思想是給所有頂點中的密集連接組打上一個唯一標(biāo)簽,這些擁有相同標(biāo)簽的組就是所謂的團體。
它不保證收斂,且迭代次數(shù)足夠多之后,所有聯(lián)通節(jié)點最終收斂為一個社區(qū)。
該算法也可以用于半監(jiān)督學(xué)習(xí)(大部分沒有標(biāo)簽,小部分有標(biāo)簽),給那些沒有標(biāo)簽的通過標(biāo)簽傳播算法進行打標(biāo)簽。也可以應(yīng)用于風(fēng)控,對于通過已有風(fēng)險評估的人,通過社交網(wǎng)絡(luò)去評估跟其有關(guān)系的人的風(fēng)險。
基本思想
標(biāo)簽傳播算法的應(yīng)用場景是不重疊社區(qū)發(fā)現(xiàn),其基本思想是:將一個節(jié)點的鄰居節(jié)點的標(biāo)簽中數(shù)量最多的標(biāo)簽作為該節(jié)點自身的標(biāo)簽。給每個節(jié)點添加標(biāo)簽(label)以代表它所屬的社區(qū),并通過標(biāo)簽的“傳播”形成同一標(biāo)簽的“社區(qū)”結(jié)構(gòu)。簡而言之,你的鄰居屬于哪個label最多,你就屬于哪個label。該算法的有點是收斂周期短,除了迭代次數(shù)無需任何先驗參數(shù)(不需事先指定社區(qū)個數(shù)和大小),算法執(zhí)行過程中不需要計算任何社區(qū)指標(biāo)。
以上就是Spark GraphX 分布式圖處理框架圖算法詳解的詳細內(nèi)容,更多關(guān)于Spark GraphX 圖算法的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
如何在vscode中正確使用正則表達式進行文檔內(nèi)容的替換編輯
正則表達式是一種強大的模式匹配工具,它具有廣泛的應(yīng)用,包括數(shù)據(jù)清洗、文本處理、文件搜索等方面,這篇文章主要給大家介紹了關(guān)于如何在vscode中正確使用正則表達式進行文檔內(nèi)容的替換編輯,需要的朋友可以參考下2023-12-12

