Spark GraphX实例

Posted 代码空间

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark GraphX实例相关的知识,希望对你有一定的参考价值。

7. 图的聚合操作

图的聚合操作主要的方法有:

(1) Graph.mapReduceTriplets():该方法有一个mapFunc和一个reduceFunc,mapFunc对图中的每一个EdgeTriplet进行处理,生成一个或者多个消息,并且将这些消息发送个Edge的一个或者两个顶点,reduceFunc对发送到每一个顶点上的消息进行合并,生成最终的消息,最后返回一个VertexRDD(不包括没有收到消息的顶点);

(2) Graph.pregel():该方法采用BSP模型,包括三个函数vprog、sendMsg和mergeMsg,vprog是运行在每个节点上的顶点更新函数,接收消息,然后对顶点属性更新,sendMsg生成发送给下一次迭代的消息,mergeMsg对同一个顶点接收到的多个消息进行合并,迭代一直进行到收敛,或者达到了设置的最大迭代次数为止。

代码:

    // 聚合操作
    println("*************************************************************")
    println("聚合操作")
    println("*************************************************************")
    println("找出年纪最大的追求者:")
    val oldestFollower:VertexRDD[(String,Int)] = userGraph.mapReduceTriplets[(String,Int)](
      // 将源顶点的属性发送给目标顶点,map过程
      edge => Iterator((edge.dstId,(edge.srcAttr.name,edge.srcAttr.age))),
      // 得到最大追求者,reduce过程
      (a,b) => if(a._2>b._2) a else b
    )
    userGraph.vertices.leftJoin(oldestFollower){(id,user,optOldestFollower) =>
      optOldestFollower match{
        case None => s"${user.name} does not have any followers."
        case Some(oldestAge) => s"The oldest age of ${user.name} \‘s followers is ${oldestAge._2}(${oldestAge._1})."
      }
    }.collect.foreach{case(id,str) => println(str)}
    println

    // 找出追求者的平均年龄
    println("找出追求者的平均年龄:")
    val averageAge:VertexRDD[Double] = userGraph.mapReduceTriplets[(Int,Double)](
      // 将源顶点的属性(1,Age)发送给目标顶点,map过程
      edge => Iterator((edge.dstId,(1,edge.srcAttr.age.toDouble))),
      // 得到追求者的数量和总年龄
      (a,b) => ((a._1+b._1),(a._2+b._2))
    ).mapValues((id,p) => p._2/p._1)

    userGraph.vertices.leftJoin(averageAge){(id,user,optAverageAge) =>
      optAverageAge match{
        case None => s"${user.name} does not have any followers."
        case Some(avgAge) => s"The average age of ${user.name} \‘s followers is $avgAge."
      }
    }.collect.foreach{case(id,str) => println(str)}
    println

    // 聚合操作2
    println("*************************************************************")
    println("聚合操作2")
    println("*************************************************************")
    println("找出3到各顶点的最短距离:")
    // 定义源点
    val sourceId:VertexId = 3L
    val initialGraph = graph.mapVertices((id,_) => if(id==sourceId) 0.0 else Double.PositiveInfinity)
    val sssp = initialGraph.pregel(Double.PositiveInfinity)(
      (id,dist,newDist) => math.min(dist,newDist),
      // 权重计算
      triplet=>{
        if(triplet.srcAttr + triplet.attr < triplet.dstAttr){
          Iterator((triplet.dstId, triplet.srcAttr+triplet.attr))
        } else{
          Iterator.empty
        }
      },
      // 最短距离
      (a,b) => math.min(a,b)
    )
    println(sssp.vertices.collect.mkString("\n"))

运行结果:

*************************************************************
聚合操作
*************************************************************
找出年纪最大的追求者:
The oldest age of Peter ‘s followers is 27(Henry).
The oldest age of Kate ‘s followers is 55(Charlie).
The oldest age of Henry ‘s followers is 55(Charlie).
The oldest age of Alice ‘s followers is 32(Peter).
The oldest age of Charlie ‘s followers is 35(Mike).
Mike does not have any followers.

找出追求者的平均年龄:
The average age of Peter ‘s followers is 27.0.
The average age of Kate ‘s followers is 45.0.
The average age of Henry ‘s followers is 45.0.
The average age of Alice ‘s followers is 29.5.
The average age of Charlie ‘s followers is 35.0.
Mike does not have any followers.

*************************************************************
聚合操作2
*************************************************************
找出3到各顶点的最短距离:
(4,9.0)
(6,3.0)
(2,7.0)
(1,10.0)
(3,0.0)
(5,Infinity)

以上是关于Spark GraphX实例的主要内容,如果未能解决你的问题,请参考以下文章

Spark GraphX实例

Spark GraphX实例

Spark GraphX图计算代码实现,源码分析

Spark中GraphX图运算pregel详解

大数据技术之_19_Spark学习_05_Spark GraphX 应用解析 + Spark GraphX 概述解析 + 计算模式 + Pregel API + 图算法参考代码 + PageRank(

Apache Spark Graphx - Java 实现