SparkGraphx計(jì)算指定節(jié)點(diǎn)的N度關(guān)系節(jié)點(diǎn)源碼
直接上代碼:
package horizon.graphx.util
import java.security.InvalidParameterException
import horizon.graphx.util.CollectionUtil.CollectionHelper
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
/**
* Created by yepei.ye on 2017/1/19.
* Description:用于在圖中為指定的節(jié)點(diǎn)計(jì)算這些節(jié)點(diǎn)的N度關(guān)系節(jié)點(diǎn),輸出這些節(jié)點(diǎn)與源節(jié)點(diǎn)的路徑長(zhǎng)度和節(jié)點(diǎn)id
*/
object GraphNdegUtil {
val maxNDegVerticesCount = 10000
val maxDegree = 1000
/**
* 計(jì)算節(jié)點(diǎn)的N度關(guān)系
*
* @param edges
* @param choosedVertex
* @param degree
* @tparam ED
* @return
*/
def aggNdegreedVertices[ED: ClassTag](edges: RDD[(VertexId, VertexId)], choosedVertex: RDD[VertexId], degree: Int): VertexRDD[Map[Int, Set[VertexId]]] = {
val simpleGraph = Graph.fromEdgeTuples(edges, 0, Option(PartitionStrategy.EdgePartition2D), StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER)
aggNdegreedVertices(simpleGraph, choosedVertex, degree)
}
def aggNdegreedVerticesWithAttr[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], choosedVertex: RDD[VertexId], degree: Int, sendFilter: (VD, VD) => Boolean = (_: VD, _: VD) => true): VertexRDD[Map[Int, Set[VD]]] = {
val ndegs: VertexRDD[Map[Int, Set[VertexId]]] = aggNdegreedVertices(graph, choosedVertex, degree, sendFilter)
val flated: RDD[Ver[VD]] = ndegs.flatMap(e => e._2.flatMap(t => t._2.map(s => Ver(e._1, s, t._1, null.asInstanceOf[VD])))).persist(StorageLevel.MEMORY_AND_DISK_SER)
val matched: RDD[Ver[VD]] = flated.map(e => (e.id, e)).join(graph.vertices).map(e => e._2._1.copy(attr = e._2._2)).persist(StorageLevel.MEMORY_AND_DISK_SER)
flated.unpersist(blocking = false)
ndegs.unpersist(blocking = false)
val grouped: RDD[(VertexId, Map[Int, Set[VD]])] = matched.map(e => (e.source, ArrayBuffer(e))).reduceByKey(_ ++= _).map(e => (e._1, e._2.map(t => (t.degree, Set(t.attr))).reduceByKey(_ ++ _).toMap))
matched.unpersist(blocking = false)
VertexRDD(grouped)
}
def aggNdegreedVertices[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
choosedVertex: RDD[VertexId],
degree: Int,
sendFilter: (VD, VD) => Boolean = (_: VD, _: VD) => true
): VertexRDD[Map[Int, Set[VertexId]]] = {
if (degree < 1) {
throw new InvalidParameterException("度參數(shù)錯(cuò)誤:" + degree)
}
val initVertex = choosedVertex.map(e => (e, true)).persist(StorageLevel.MEMORY_AND_DISK_SER)
var g: Graph[DegVertex[VD], Int] = graph.outerJoinVertices(graph.degrees)((_, old, deg) => (deg.getOrElse(0), old))
.subgraph(vpred = (_, a) => a._1 <= maxDegree)
//去掉大節(jié)點(diǎn)
.outerJoinVertices(initVertex)((id, old, hasReceivedMsg) => {
DegVertex(old._2, hasReceivedMsg.getOrElse(false), ArrayBuffer((id, 0))) //初始化要發(fā)消息的節(jié)點(diǎn)
}).mapEdges(_ => 0).cache() //簡(jiǎn)化邊屬性
choosedVertex.unpersist(blocking = false)
var i = 0
var prevG: Graph[DegVertex[VD], Int] = null
var newVertexRdd: VertexRDD[ArrayBuffer[(VertexId, Int)]] = null
while (i < degree + 1) {
prevG = g
//發(fā)第i+1輪消息
newVertexRdd = prevG.aggregateMessages[ArrayBuffer[(VertexId, Int)]](sendMsg(_, sendFilter), (a, b) => reduceVertexIds(a ++ b)).persist(StorageLevel.MEMORY_AND_DISK_SER)
g = g.outerJoinVertices(newVertexRdd)((vid, old, msg) => if (msg.isDefined) updateVertexByMsg(vid, old, msg.get) else old.copy(init = false)).cache()
prevG.unpersistVertices(blocking = false)
prevG.edges.unpersist(blocking = false)
newVertexRdd.unpersist(blocking = false)
i += 1
}
newVertexRdd.unpersist(blocking = false)
val maped = g.vertices.join(initVertex).mapValues(e => sortResult(e._1)).persist(StorageLevel.MEMORY_AND_DISK_SER)
initVertex.unpersist()
g.unpersist(blocking = false)
VertexRDD(maped)
}
private case class Ver[VD: ClassTag](source: VertexId, id: VertexId, degree: Int, attr: VD = null.asInstanceOf[VD])
private def updateVertexByMsg[VD: ClassTag](vertexId: VertexId, oldAttr: DegVertex[VD], msg: ArrayBuffer[(VertexId, Int)]): DegVertex[VD] = {
val addOne = msg.map(e => (e._1, e._2 + 1))
val newMsg = reduceVertexIds(oldAttr.degVertices ++ addOne)
oldAttr.copy(init = msg.nonEmpty, degVertices = newMsg)
}
private def sortResult[VD: ClassTag](degs: DegVertex[VD]): Map[Int, Set[VertexId]] = degs.degVertices.map(e => (e._2, Set(e._1))).reduceByKey(_ ++ _).toMap
case class DegVertex[VD: ClassTag](var attr: VD, init: Boolean = false, degVertices: ArrayBuffer[(VertexId, Int)])
case class VertexDegInfo[VD: ClassTag](var attr: VD, init: Boolean = false, degVertices: ArrayBuffer[(VertexId, Int)])
private def sendMsg[VD: ClassTag](e: EdgeContext[DegVertex[VD], Int, ArrayBuffer[(VertexId, Int)]], sendFilter: (VD, VD) => Boolean): Unit = {
try {
val src = e.srcAttr
val dst = e.dstAttr
//只有dst是ready狀態(tài)才接收消息
if (src.degVertices.size < maxNDegVerticesCount && (src.init || dst.init) && dst.degVertices.size < maxNDegVerticesCount && !isAttrSame(src, dst)) {
if (sendFilter(src.attr, dst.attr)) {
e.sendToDst(reduceVertexIds(src.degVertices))
}
if (sendFilter(dst.attr, dst.attr)) {
e.sendToSrc(reduceVertexIds(dst.degVertices))
}
}
} catch {
case ex: Exception =>
println(s"==========error found: exception:${ex.getMessage}," +
s"edgeTriplet:(srcId:${e.srcId},srcAttr:(${e.srcAttr.attr},${e.srcAttr.init},${e.srcAttr.degVertices.size}))," +
s"dstId:${e.dstId},dstAttr:(${e.dstAttr.attr},${e.dstAttr.init},${e.dstAttr.degVertices.size}),attr:${e.attr}")
ex.printStackTrace()
throw ex
}
}
private def reduceVertexIds(ids: ArrayBuffer[(VertexId, Int)]): ArrayBuffer[(VertexId, Int)] = ArrayBuffer() ++= ids.reduceByKey(Math.min)
private def isAttrSame[VD: ClassTag](a: DegVertex[VD], b: DegVertex[VD]): Boolean = a.init == b.init && allKeysAreSame(a.degVertices, b.degVertices)
private def allKeysAreSame(a: ArrayBuffer[(VertexId, Int)], b: ArrayBuffer[(VertexId, Int)]): Boolean = {
val aKeys = a.map(e => e._1).toSet
val bKeys = b.map(e => e._1).toSet
if (aKeys.size != bKeys.size || aKeys.isEmpty) return false
aKeys.diff(bKeys).isEmpty && bKeys.diff(aKeys).isEmpty
}
}
其中sortResult方法里對(duì)Traversable[(K,V)]類型的集合使用了reduceByKey方法,這個(gè)方法是自行封裝的,使用時(shí)需要導(dǎo)入,代碼如下:
/**
* Created by yepei.ye on 2016/12/21.
* Description:
*/
object CollectionUtil {
/**
* 對(duì)具有Traversable[(K, V)]類型的集合添加reduceByKey相關(guān)方法
*
* @param collection
* @param kt
* @param vt
* @tparam K
* @tparam V
*/
implicit class CollectionHelper[K, V](collection: Traversable[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V]) {
def reduceByKey(f: (V, V) => V): Traversable[(K, V)] = collection.groupBy(_._1).map { case (_: K, values: Traversable[(K, V)]) => values.reduce((a, b) => (a._1, f(a._2, b._2))) }
/**
* reduceByKey的同時(shí),返回被reduce掉的元素的集合
*
* @param f
* @return
*/
def reduceByKeyWithReduced(f: (V, V) => V)(implicit kt: ClassTag[K], vt: ClassTag[V]): (Traversable[(K, V)], Traversable[(K, V)]) = {
val reduced: ArrayBuffer[(K, V)] = ArrayBuffer()
val newSeq = collection.groupBy(_._1).map {
case (_: K, values: Traversable[(K, V)]) => values.reduce((a, b) => {
val newValue: V = f(a._2, b._2)
val reducedValue: V = if (newValue == a._2) b._2 else a._2
val reducedPair: (K, V) = (a._1, reducedValue)
reduced += reducedPair
(a._1, newValue)
})
}
(newSeq, reduced.toTraversable)
}
}
}
總結(jié)
以上就是本文關(guān)于SparkGraphx計(jì)算指定節(jié)點(diǎn)的N度關(guān)系節(jié)點(diǎn)源碼的全部?jī)?nèi)容了,希望對(duì)大家有所幫助。感興趣的朋友可以參閱:淺談七種常見的Hadoop和Spark項(xiàng)目案例 Spark的廣播變量和累加器使用方法代碼示例 Spark入門簡(jiǎn)介等,有什么問題請(qǐng)留言,小編會(huì)及時(shí)回復(fù)大家的。
相關(guān)文章
VSCODE使用ssh遠(yuǎn)程連接時(shí)啟動(dòng)服務(wù)器失敗問題及解決方法
ping服務(wù)器的ip可通并且使用terminal可以ssh連接到遠(yuǎn)程服務(wù)器,但使用vscode的remote-ssh時(shí),在「輸出」欄出現(xiàn)了一直報(bào) Waiting for server log… 的情況,這篇文章主要介紹了VSCODE使用ssh遠(yuǎn)程連接時(shí)啟動(dòng)服務(wù)器失敗問題及解決方法,感興趣的朋友一起看看吧2024-02-02
在idea打包并上傳到云服務(wù)項(xiàng)目流程分析
這篇文章主要介紹了在idea打包并上傳到云服務(wù)項(xiàng)目流程分析,本文給大家介紹的非常詳細(xì),對(duì)大家的工作或?qū)W習(xí)具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-03-03
Windows10 1903錯(cuò)誤0xc0000135解決方案【推薦】
windows10 1903是2019年Mircosoft推出的最新版windows操作系統(tǒng),但是有很多機(jī)器裝上之后會(huì)存在不支持.net framework的現(xiàn)象,因此在這里,筆者為大家?guī)砹私鉀Q該問題簡(jiǎn)單好用的方案,需要的朋友可以參考下2019-10-10
SVN使用教程_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要為大家詳細(xì)介紹了SVN使用教程和注意事項(xiàng),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-08-08
在Windows平臺(tái)下安裝與配置Memcached的方法分享
在Windows平臺(tái)下安裝與配置Memcached的方法,Memcached 是一個(gè)高性能的分布式內(nèi)存對(duì)象緩存系統(tǒng),用于動(dòng)態(tài)Web應(yīng)用以減輕數(shù)據(jù)庫負(fù)載2012-05-05

