GraphX实现N度关系
Posted 张包峰
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了GraphX实现N度关系相关的知识,希望对你有一定的参考价值。
背景
本文给出了一个简单的计算图中每个点的N度关系点集合的算法,也就是N跳关系。
之前通过官方文档学习和理解了一下GraphX的计算接口。
N度关系
目标:
在N轮里,找到某一个点的N度关系的点集合。
实现思路:
1. 准备好边数据集,即”1 3”, “4, 1” 这样的点关系。使用GraphLoader 的接口load成Graph
2. 初始化每个Vertice的属性为空Map
3. 使用aggregateMessages把VerticeID和totalRounds传播出度点上,出度点把收集到的信息合成一个大Map
4. 更新后的Vertice与原图进行”Join”,更新图中的变化过的点属性
5. 重复步骤3和4,最后输出更新了N轮之后的有关系的Vertice
spark-shell下可执行的代码:
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val friendsGraph = GraphLoader.edgeListFile(sc, "data/friends.txt")
val totalRounds: Int = 3 // total N round
var targetVerticeID: Long = 6 // target vertice
// round one
var roundGraph = friendsGraph.mapVertices((id, vd) => Map())
var roundVertices = roundGraph.aggregateMessages[Map[Long, Integer]](
ctx =>
if (targetVerticeID == ctx.srcId)
// only the edge has target vertice should send msg
ctx.sendToDst(Map(ctx.srcId -> totalRounds))
,
_ ++ _
)
for (i <- 2 to totalRounds)
val thisRoundGraph = roundGraph.outerJoinVertices(roundVertices) (vid, data, opt) => opt.getOrElse(Map[Long, Integer]())
roundVertices = thisRoundGraph.aggregateMessages[Map[Long, Integer]](
ctx =>
val iterator = ctx.srcAttr.iterator
while (iterator.hasNext)
val (k, v) = iterator.next
if (v > 1)
val newV = v - 1
ctx.sendToDst(Map(k -> newV))
ctx.srcAttr.updated(k, newV)
else
// do output and remove this entry
,
(newAttr, oldAttr) =>
if (oldAttr.contains(newAttr.head._1)) // optimization to reduce msg
oldAttr.updated(newAttr.head._1, 1) // stop sending this ever
else
oldAttr ++ newAttr
)
val result = roundVertices.map(_._1).collect
数据和输出
2 1
4 1
1 2
6 3
7 3
7 6
6 7
3 7
4 3
1 6
6 1
Array(6, 1, 3, 7)
总结
实现的比较naive,还有许多可以优化的地方。
全文完 :)
以上是关于GraphX实现N度关系的主要内容,如果未能解决你的问题,请参考以下文章