Spark GraphX学习笔记

Posted 千寻千梦

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark GraphX学习笔记相关的知识,希望对你有一定的参考价值。

概述

  • GraphX是 Spark中用于图(如Web-Graphs and Social Networks)和图并行计算(如 PageRank and Collaborative Filtering)的API,可以认为是GraphLab(C++)和Pregel(C++)在Spark(Scala)上的重写及优化,跟其他分布式 图计算框架相比,GraphX最大的贡献是,在Spark之上提供一站式数据解决方案,可以方便且高效地完成图计算的一整套流水作业。
  • Graphx是Spark生态中的非常重要的组件,融合了图并行以及数据并行的优势,虽然在单纯的计算机段的性能相比不如GraphLab等计算框架,但是如果从整个图处理流水线的视角(图构建,图合并,最终结果的查询)看,那么性能就非常具有竞争性了。
    这里写图片描述

图计算应用场景

    “图计算”是以“图论”为基础的对现实世界的一种“图”结构的抽象表达,以及在这种数据结构上的计算模式。通常,在图计算中,基本的数据结构表达就是:G = (V,E,D) V = vertex (顶点或者节点) E = edge (边) D = data (权重)。
    图数据结构很好的表达了数据之间的关联性,因此,很多应用中出现的问题都可以抽象成图来表示,以图论的思想或者以图为基础建立模型来解决问题。
下面是一些图计算的应用场景:
PageRank让链接来”投票”
基于GraphX的社区发现算法FastUnfolding分布式实现
http://bbs.pinggu.org/thread-3614747-1-1.html
社交网络分析
如基于Louvian社区发现的新浪微博社交网络分析
社交网络最适合用图来表达和计算了,图的“顶点”表示社交中的人,“边”表示人与人之间的关系。
基于三角形计数的关系衡量
基于随机游走的用户属性传播
推荐应用
如淘宝推荐商品,腾讯推荐好友等等(同样是基于社交网络这个大数据,可以很好构建一张大图)
淘宝应用
度分布、二跳邻居数、连通图、多图合并、能量传播模型
所有的关系都可以从“图”的角度来看待和处理,但到底一个关系的价值多大?健康与否?适合用于什么场景?
快刀初试:Spark GraphX在淘宝的实践
http://www.csdn.net/article/2014-08-07/2821097

Spark中图的建立及图的基本操作

图的构建

          首先利用“顶点”和“边”RDD建立一个简单的属性图,通过这个例子,了解完整的GraphX图构建的基本流程。
          如下图所示,顶点的属性包含用户的姓名和职业,带标注的边表示不同用户之间的关系。
这里写图片描述

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

object myGraphX {

  def main(args:Array[String]){

    // Create the context  
    val sparkConf = new SparkConf().setAppName("myGraphpractice").setMaster("local[2]")
    val sc=new SparkContext(sparkConf) 

    // 顶点RDD[顶点的id,顶点的属性值]
    val users: RDD[(VertexId, (String, String))] =
      sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
    // 边RDD[起始点id,终点id,边的属性(边的标注,边的权重等)]
    val relationships: RDD[Edge[String]] =
      sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
    // 默认(缺失)用户
    //Define a default user in case there are relationship with missing user
    val defaultUser = ("John Doe", "Missing")

    //使用RDDs建立一个Graph(有许多建立Graph的数据来源和方法,后面会详细介绍)
    val graph = Graph(users, relationships, defaultUser)     
  }
}

          上面是一个简单的例子,说明如何建立一个属性图,那么建立一个图主要有哪些方法呢?我们先看图的定义:

object Graph {
  def apply[VD, ED](
      vertices: RDD[(VertexId, VD)],
      edges: RDD[Edge[ED]],
      defaultVertexAttr: VD = null)
    : Graph[VD, ED]

  def fromEdges[VD, ED](
      edges: RDD[Edge[ED]],
      defaultValue: VD): Graph[VD, ED]

  def fromEdgeTuples[VD](
      rawEdges: RDD[(VertexId, VertexId)],
      defaultValue: VD,
      uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]

}

          由上面的定义我们可以看到,GraphX主要有三种方法可以建立图:
          (1)在构造图的时候,会自动使用apply方法,因此前面那个例子中实际上就是使用apply方法。相当于Java/C++语言的构造函数。有三个参数,分别是:vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD = null),前两个必须有,最后一个可选择。“顶点“和”边“的RDD来自不同的数据源,与Spark中其他RDD的建立并没有区别。
          这里再举读取文件,产生RDD,然后利用RDD建立图的例子:

(1)读取文件,建立顶点和边的RRD,然后利用RDD建立属性图

//读入数据文件
val articles: RDD[String] = sc.textFile("E:/data/graphx/graphx-wiki-vertices.txt")
val links: RDD[String] = sc.textFile("E:/data/graphx/graphx-wiki-edges.txt")

//装载“顶点”和“边”RDD
val vertices = articles.map { line =>
    val fields = line.split('\\t')
      (fields(0).toLong, fields(1))
    }//注意第一列为vertexId,必须为Long,第二列为顶点属性,可以为任意类型,包括Map等序列。

val edges = links.map { line =>
    val fields = line.split('\\t')
      Edge(fields(0).toLong, fields(1).toLong, 1L)//起始点ID必须为Long,最后一个是属性,可以为任意类型
    }
//建立图
val graph = Graph(vertices, edges, "").persist()//自动使用apply方法建立图

(2)Graph.fromEdges方法:这种方法相对而言最为简单,只是由”边”RDD建立图,由边RDD中出现所有“顶点”(无论是起始点src还是终点dst)自动产生顶点vertextId,顶点的属性将被设置为一个默认值。
      Graph.fromEdges allows creating a graph from only an RDD of edges, automatically creating any vertices mentioned by edges and assigning them the default value.
          举例如下:

//读入数据文件        
val records: RDD[String] = sc.textFile("/microblogPCU/microblogPCU/follower_followee")   
//微博数据:000000261066,郜振博585,3044070630,redashuaicheng,1929305865,1994,229,3472,male,first
// 第三列是粉丝Id:3044070630,第五列是用户Id:1929305865
val followers=records.map {case x => val fields=x.split(",")
          Edge(fields(2).toLong, fields(4).toLong,1L )       
      }    
val graph=Graph.fromEdges(followers, 1L)

(3)Graph.fromEdgeTuples方法
          Graph.fromEdgeTuples allows creating a graph from only an RDD of edge tuples, assigning the edges the value 1, and automatically creating any vertices mentioned by edges and assigning them the default value. It also supports deduplicating the edges; to deduplicate, pass Some of a PartitionStrategy as the uniqueEdges parameter (for example, uniqueEdges = Some(PartitionStrategy.RandomVertexCut)). A partition strategy is necessary to colocate identical edges on the same partition so they can be deduplicated.

          除了三种方法,还可以用GraphLoader构建图。如下面GraphLoader.edgeListFile:
(4)GraphLoader.edgeListFile建立图的基本结构,然后Join属性
(a)首先建立图的基本结构:
          利用GraphLoader.edgeListFile函数从边List文件中建立图的基本结构(所有“顶点”+“边”),且顶点和边的属性都默认为1。

object GraphLoader {
  def edgeListFile(
      sc: SparkContext,
      path: String,
      canonicalOrientation: Boolean = false,
      minEdgePartitions: Int = 1)
    : Graph[Int, Int]
}

使用方法如下:

val graph=GraphLoader.edgeListFile(sc, "/data/graphx/followers.txt") 
//文件的格式如下:
//2 1
//4 1
//1 2  依次为第一个顶点和第二个顶点

(b)然后读取属性文件,获得RDD后和(1)中得到的基本结构图join在一起,就可以组合成完整的属性图。

三种视图及操作

  Spark中图有以下三种视图可以访问,分别通过graph.vertices,graph.edges,graph.triplets来访问。
这里写图片描述

          在Scala语言中,可以用case语句进行形式简单、功能强大的模式匹配

//假设graph顶点属性(String,Int)-(name,age),边有一个权重(int)
val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)
用case匹配可以很方便访问顶点和边的属性及id
graph.vertices.map{
      case (id,(name,age))=>//利用case进行匹配
        (age,name)//可以在这里加上自己想要的任何转换
    }

graph.edges.map{
      case Edge(srcid,dstid,weight)=>//利用case进行匹配
        (dstid,weight*0.01)//可以在这里加上自己想要的任何转换
    }

          也可以通过下标访问

graph.vertices.map{
      v=>(v._1,v._2._1,v._2._2)//v._1,v._2._1,v._2._2分别对应Id,name,age
}

graph.edges.map {
      e=>(e.attr,e.srcId,e.dstId)
}

graph.triplets.map{
      triplet=>(triplet.srcAttr._1,triplet.dstAttr._2,triplet.srcId,triplet.dstId)
    }

     可以不用graph.vertices先提取顶点再map的方法,也可以通过graph.mapVertices直接对顶点进行map,返回是相同结构的另一个Graph,访问属性的方法和上述方法是一模一样的。如下:

graph.mapVertices{
      case (id,(name,age))=>//利用case进行匹配
        (age,name)//可以在这里加上自己想要的任何转换
}

graph.mapEdges(e=>(e.attr,e.srcId,e.dstId))

graph.mapTriplets(triplet=>(triplet.srcAttr._1))

Spark GraphX中的图的函数大全

/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
  // Information about the Graph 
  //图的基本信息统计
===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]

  // Views of the graph as collections 
  // 图的三种视图
=============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]

  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]

  // Transform vertex and edge attributes 
  // 基本的转换操作
==========================================================
  def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]

  // Modify the graph structure 
  //图的结构操作(仅给出四种基本的操作,子图提取是比较重要的操作)
====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexID, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

  // Join RDDs with the graph 
  // 两种聚合方式,可以完成各种图的聚合操作  ======================================================================
  def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
      (mapFunc: (VertexID, VD, Option[U]) => VD2)

  // Aggregate information about adjacent triplets 
  //图的邻边信息聚合,collectNeighborIds都是效率不高的操作,优先使用aggregateMessages,这也是GraphX最重要的操作之一。
  =================================================
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]

  // Iterative graph-parallel computation ==========================================================
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexID, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]

  // Basic graph algorithms 
  //图的算法API(目前给出了三类四个API)  ========================================================================
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexID, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]
}

结构操作

Structural Operators
      Spark2.0版本中,仅仅有四种最基本的结构操作,未来将开发更多的结构操作。

class Graph[VD, ED] {
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}

子图subgraph

      子图(subgraph)是图论的基本概念之一。子图是指节点集和边集分别是某一图的节点集的子集和边集的子集的图。
  Spark API–subgraph利用EdgeTriplet(epred)或/和顶点(vpred)满足一定条件,来提取子图。利用这个操作可以使顶点和边被限制在感兴趣的范围内,比如删除失效的链接。
        The subgraph operator takes vertex and edge predicates and returns the graph containing only the vertices that satisfy the vertex predicate (evaluate to true) and edges that satisfy the edge predicate and connect vertices that satisfy the vertex predicate. The subgraph operator can be used in number of situations to restrict the graph to the vertices and edges of interest or eliminate broken links. For example in the following code we remove broken links:

//假设graph有如下的顶点和边 顶点RDD(id,(name,age) 边上有一个Int权重(属性)
(4,(David,42))(6,(Fran,50))(2,(Bob,27)) (1,(Alice,28))(3,(Charlie,65))(5,(Ed,55))
Edge(5,3,8)Edge(2,1,7)Edge(3,2,4) Edge(5,6,3)Edge(3,6,3)

//可以使用以下三种操作方法获取满足条件的子图
//方法1,对顶点进行操作
val subGraph1=graph.subgraph(vpred=(id,attr)=>attr._2>30)
//vpred=(id,attr)=>attr._2>30 顶点vpred第二个属性(age)>30岁
subGraph1.vertices.foreach(print)
println
subGraph1.edges.foreach {print}
println
输出结果:
顶点:(4,(David,42))(6,(Fran,50))(3,(Charlie,65))(5,(Ed,55))
边:Edge(3,6,3)Edge(5,3,8)Edge(5,6,3)

//方法2--对EdgeTriplet进行操作
val subGraph2=graph.subgraph(epred=>epred.attr>2)
//epred(边)的属性(权重)大于2
输出结果:
顶点:(4,(David,42))(6,(Fran,50))(2,(Bob,27))(1,(Alice,28)) (3,(Charlie,65))(5,(Ed,55))
边:Edge(5,3,8)Edge(5,6,3)Edge(2,1,7)Edge(3,2,4) Edge(3,6,3)
//也可以定义如下的操作
val subGraph2=graph.subgraph(epred=>pred.srcAttr._2<epred.dstAttr._2))
//起始顶点的年龄小于终点顶点年龄
顶点:1,(Alice,28))(4,(David,42))(3,(Charlie,65))(6,(Fran,50)) (2,(Bob,27))(5,(Ed,55))
边 :Edge(5,3,8)Edge(2,1,7)Edge(2,4,2)

//方法3--对顶点和边Triplet两种同时操作“,”号隔开epred和vpred
val subGraph3=graph.subgraph(epred=>epred.attr>3,vpred=(id,attr)=>attr._2>30)
输出结果:
顶点:(3,(Charlie,65))(5,(Ed,55))(4,(David,42))(6,(Fran,50))
边:Edge(5,3,8)

图的基本信息统计-度计算

度分布:这是一个图最基础和重要的指标。度分布检测的目的,主要是了解图中“超级节点”的个数和规模,以及所有节点度的分布曲线。超级节点的存在对各种传播算法都会有重大的影响(不论是正面助力还是反面阻力),因此要预先对这些数据量有个预估。借助GraphX最基本的图信息接口degrees: VertexRDD[Int](包括inDegrees和outDegrees),这个指标可以轻松计算出来,并进行各种各样的统计(摘自《快刀初试:Spark GraphX在淘宝的实践》。

//-----------------度的Reduce,统计度的最大值-----------------
def max(a:(VertexId,Int),b:(VertexId,Int)):(VertexId,Int)={
            if (a._2>b._2) a  else b }

val totalDegree=graph.degrees.reduce((a,b)=>max(a, b))
val inDegree=graph.inDegrees.reduce((a,b)=>max(a,b))
val outDegree=graph.outDegrees.reduce((a,b)=>max(a,b))

print("max total Degree = "+totalDegree)
print("max in Degree = "+inDegree)
print("max out Degree = "+outDegree)
//小技巧:如何知道ab的类型为(VertexId,Int)?
//当你敲完graph.degrees.reduce((a,b)=>,再将鼠标点到a和b上查看,
//就会发现a和b是(VertexId,Int),当然reduce后的返回值也是(VertexId,Int)
//这样就很清楚自己该如何定义max函数了  

//平均度
val sumOfDegree=graph.degrees.map(x=>(x._2.toLong)).reduce((a,b)=>a+b)    
val meanDegree=sumOfDegree.toDouble/graph.vertices.count().toDouble
print("meanDegree "+meanDegree)
println     

//------------------使用RDD自带的统计函数进行度分布分析--------
//度的统计分析
//最大,最小
val degree2=graph.degrees.map(a=>(a._2,a._1))
//graph.degrees是VertexRDD[Int],即(VertexID,Int)。
//通过上面map调换成map(a=>(a._2,a._1)),即RDD[(Int,VetexId)]
//这样下面就可以将度(Int)当作键值(key)来操作了,
//包括下面的min,max,sortByKey,top等等,因为这些函数都是对第一个值也就是key操作的
//max degree
print("max degree = " + (degree2.max()._2,degree2.max()._1))
println

//min degree
print("min degree =" +(degree2.min()._2,degree2.min()._1))
println

//top(N) degree"超级节点"
print("top 3 degrees:\\n")   
degree2.sortByKey(true, 1).top(3).foreach(x=>print(x._2,x._1))
println

/*输出结果:
 * max degree = (2,4)//(Vetext,degree)
 * min degree =(1,2)
 * top 3 degrees:
 * (2,4)(5,3)(3,3)
 */ 

相邻聚合—消息聚合

       相邻聚合(Neighborhood Aggregation)
       图分析任务的一个关键步骤是汇总每个顶点附近的信息。例如我们可能想知道每个用户的追随者的数量或者每个用户的追随者的平均年龄。许多迭代图算法(如PageRank,最短路径和连通体) 多次聚合相邻顶点的属性。
       聚合消息(aggregateMessages)
 GraphX中的核心聚合操作是 aggregateMessages,它主要功能是向邻边发消息,合并邻边收到的消息,返回messageRDD。这个操作将用户定义的sendMsg函数应用到图的每个边三元组(edge triplet),然后应用mergeMsg函数在其目的顶点聚合这些消息。

class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,//(1)--sendMsg:向邻边发消息,相当与MR中的Map函数
      mergeMsg: (Msg, Msg) => Msg,//(2)--mergeMsg:合并邻边收到的消息,相当于Reduce函数
      tripletFields: TripletFields = TripletFields.All)//(3)可选项,TripletFields.Src/Dst/All
    : VertexRDD[Msg]//(4)--返回messageRDD
}

(1)sendMsg:
        将sendMsg函数看做map-reduce过程中的map函数,向邻边发消息,应用到图的每个边三元组(edge triplet),即函数的左侧为每个边三元组(edge triplet)。
    The user defined sendMsg function takes an EdgeContext, which exposes the source and destination attributes along with the edge attribute and functions (sendToSrc, and sendToDst) to send messages to the source and destination attributes. Think of sendMsg as the map function in map-reduce.

//关键数据结构EdgeContext源码解析

package org.apache.spark.graphx

/**
 * Represents an edge along with its neighboring vertices and allows sending messages along the
 * edge. Used in [[Graph#aggregateMessages]].
 */
abstract class EdgeContext[VD, ED, A] {//三个类型分别是:顶点、边、自定义发送消息的类型(返回值的类型)
  /** The vertex id of the edge's source vertex. */
  def srcId: VertexId
  /** The vertex id of the edge's destination vertex. */
  def dstId: VertexId
  /** The vertex attribute of the edge's source vertex. */
  def srcAttr: VD
  /** The vertex attribute of the edge's destination vertex. */
  def dstAttr: VD
  /** The attribute associated with the edge. */
  def attr: ED

  /** Sends a message to the source vertex. */
  def sendToSrc(msg: A): Unit
  /** Sends a message to the destination vertex. */
  def sendToDst(msg: A): Unit

  /** Converts the edge and vertex properties into an [[EdgeTriplet]] for convenience. */
  def toEdgeTriplet: EdgeTriplet[VD, ED] = {
    val et = new EdgeTriplet[VD, ED]
    et.srcId = srcId
    et.srcAttr = srcAttr
    et.dstId = dstId
    et.dstAttr = dstAttr
    et.attr = attr
    et
  }
}

(2)mergeMsg :
        用户自定义的mergeMsg函数指定两个消息到相同的顶点并保存为一个消息。可以将mergeMsg函数看做map-reduce过程中的reduce函数。

    The user defined mergeMsg function takes two messages destined to the same vertex and yields a single message. Think of mergeMsg as the reduce function in map-reduce.

这里写代码片

(3)TripletFields可选项
        它指出哪些数据将被访问(源顶点特征,目的顶点特征或者两者同时,即有三种可选择的值:TripletFields.Src,TripletFieldsDst,TripletFields.All。
      因此这个参数的作用是通知GraphX仅仅只需要EdgeContext的一部分参与计算,是一个优化的连接策略。例如,如果我们想计算每个用户的追随者的平均年龄,我们仅仅只需要源字段。 所以我们用TripletFields.Src表示我们仅仅只需要源字段。
     takes an optional tripletsFields which indicates what data is accessed in the EdgeContext (i.e., the source vertex attribute but not the destination vertex attribute). The possible options for the tripletsFields are defined in TripletFields and the default value is TripletFields.All which indicates that the user defined sendMsg function may access any of the fields in the EdgeContext. The tripletFields argument can be used to notify GraphX that only part of the EdgeContext will be needed allowing GraphX to select an optimized join strategy. For example if we are computing the average age of the followers of each user we would only require the source field and so we would use TripletFields.Src to indicate that we only require the source field

(4)返回值:
    The aggregateMessages operator returns a VertexRDD[Msg] containing the aggregate message (of type Msg) destined to each vertex. Vertices that did not receive a message are not included in the returned VertexRDD.

//假设已经定义好如下图:
//顶点:[Id,(name,age)]
//(4,(David,18))(1,(Alice,28))(6,(Fran,40))(3,(Charlie,30))(2,(Bob,70))(5,Ed,55))
//边:Edge(4,2,2)Edge(2,1,7)Edge(4,5,8)Edge(2,4,2)Edge(5,6,3)Edge(3,2,4)
//    Edge(6,1,2)Edge(3,6,3)Edge(6,2,8)Edge(4,1,1)Edge(6,4,3)(4,(2,110))

//定义一个相邻聚合,统计比自己年纪大的粉丝数(count)及其平均年龄(totalAge/count)
val olderFollowers=graph.aggregateMessages[(Int,Int)](
//方括号内的元组(Int,Int)是函数返回值的类型,也就是Reduce函数(mergeMsg )右侧得到的值(count,totalAge)
        triplet=> {
          if(triplet.srcAttr._2>triplet.dstAttr._2){            
              triplet.sendToDst((1,triplet.srcAttr._2))
          }
        },//(1)--函数左侧是边三元组,也就是对边三元组进行操作,有两种发送方式sendToSrc和 sendToDst
        (a,b)=>(a._1+b._1,a._2+b._2),//(2)相当于Reduce函数,a,b各代表一个元组(count,Age)
        //对count和Age不断相加(reduce),最终得到总的count和totalAge
        TripletFields.All)//(3)可选项,TripletFields.All/Src/Dst
olderFollowers.collect().foreach(println)
输出结果:
(4,(2,110))//顶点Id=4的用户,有2个年龄比自己大的粉丝,同年龄是110岁
(6,(1,55))
(1,(2,110))

//计算平均年龄
val averageOfOlderFollowers=olderFollowers.mapValues((id,value)=>value match{
      case (count,totalAge) =>(count,totalAge/count)//由于不是所有顶点都有结果,所以用match-case语句
    })    

averageOfOlderFollowers.foreach(print)  
输出结果:
(1,(2,55))(4,(2,55))(6,(1,55))//Id=1的用户,有2个粉丝,平均年龄是55岁

Spark Join连接操作

         许多情况下,需要将图与外部获取的RDDs进行连接。比如将一个额外的属性添加到一个已经存在的图上,或者将顶点属性从一个图导出到另一图中(在自己编写图计算API 时,往往需要多次进行aggregateMessages和Join操作,因此这两个操作可以说是Graphx中非常重要的操作,需要非常熟练地掌握,在本文最后的实例中,有更多的例子可供学习)
         In many cases it is necessary to join data from external collections (RDDs) with graphs. For example, we might have extra user properties that we want to merge with an existing graph or we might want to pull vertex properties from one graph into another.

有两个join API可供使用:

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]

  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}

         两个连接方式差别非常大。下面分别来说明

joinVertices连接

          返回值的类型就是graph顶点属性的类型,不能新增,也不可以减少(即不能改变原始graph顶点属性类型和个数)。
         经常会遇到这样的情形,”一个额外的费用(extraCost)增加到老的费用(oldCost)中”,oldCost为graph的顶点属性值,extraCost来自外部RDD,这时候就要用到joinVertices:
         extraCosts: RDD[(VertexID, Double)]//额外的费用
         graph:Graph[Double,Long]//oldCost
         val totlCosts = graph.joinVertices(extraCosts)( (id, oldCost, extraCost) => oldCost + extraCost)
         //extraCost和oldCost数据类型一致,且返回时无需改变原始graph顶点属性的类型。

再举一个例子:

// 假设graph的顶点如下[id,(user_name,initial_energy)]
//(6,(Fran,0))(2,(Bob,3))(4,(David,3))(3,(Charlie,1))(1,(Alice,2))(5,(Ed,2))

// graph边如下:
//Edge(2,1,1)Edge(2,4,1)Edge(4,1,1)Edge(5,2,1)Edge(5,3,1)Edge(5,6,1)Edge(3,2,1)Edge(3,6,1)

// 每个src向dst邻居发送生命值为2能量
val energys=graph.aggregateMessages[Long](
            triplet=>triplet.sendToDst(2), (a,b)=>a+b)      

// 输出结果:
// (1,4)(4,2)(3,2)(6,4)(2,4)
val energys_name=graph.joinVertices(energys){
              case(id,(name,initialEnergy),energy)=>(name,initialEnergy+energy)
              }
//输出结果:
// (3,(Charlie,3))(1,(Alice,6))(5,(Ed,2))(4,(David,5))(6,(Fran,4))(2,(Bob,7))

// 我们注意到,如果energys:RDD中没有graph某些顶点对应的值,则graph不进行任何改变,如(5,(Ed,2))。

         从上面的例子我们知道:将外部RDD joinvertices到graph中,对应于graph某些顶点,RDD中无对应的属性,则保留graph原有属性值不进行任何改变。
         而与之相反的是另一种情况,对应于graph某一些顶点,RDD中的值不止一个,这种情况下将只有一个值在join时起作用。可以先使用aggregateUsingIndex的进行reduce操作,然后再join graph。

val nonUniqueCosts: RDD[(VertexID, Double)]
val uniqueCosts: VertexRDD[Double] =
  graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
  (id, oldCost, extraCost) => oldCost + extraCost)

         If the RDD contains more than one value for a given vertex only one will be used. It is therefore recommended that the input RDD be made unique using the following which will also pre-index the resulting values to substantially accelerate the subsequent join.

(2)outerJoinVertices

         更为常用,使用起来也更加自由的是outerJoinVertices,至于为什么后面会详细分析。
         The more general outerJoinVertices behaves similarly to joinVertices except that the user defined map function is applied to all vertices and can change the vertex property type. Because not all vertices may have a matching value in the input RDD the map function takes an Option type.

         从下面函数的定义我们注意到,与前面JoinVertices不同之处在于map函数右侧类型是VD2,不再是VD,因此不受原图graph顶点属性类型VD的限制,在outerJoinVertices中使用者可以随意定义自己想要的返回类型,从而可以完全改变图的顶点属性值的类型和属性的个数。

class Graph[VD, ED] {

  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]

}

用上面例子中的graph和energys数据:

 val graph_energy_total=graph.outerJoinVertices(energys){
      case(id,(name,initialEnergy),Some(energy))=>(name,initialEnergy,energy,initialEnergy+energy)
      case(id,(name,initialEnergy),None)=>(name,initialEnergy,0,initialEnergy)
    }

// 输出结果:
// (3,(Charlie,1,2,3))(1,(Alice,2,4,6))(5,(Ed,2,0,2))
// (4,(David,3,2,5))(6,(Fran,0,4,4))(2,(Bob,3,4,7))

Spark Scala几个语法问题

(1)遇到null怎么处理?
可参考【Scala】使用Option、Some、None,避免使用null
http://www.jianshu.com/p/95896d06a94d

         大多数语言都有一个特殊的关键字或者对象来表示一个对象引用的是“无”,在Java,它是null。
         Scala鼓励你在变量和函数返回值可能不会引用任何值的时候使用Option类型。在没有值的时候,使用None,这是Option的一个子类。如果有值可以引用,就使用Some来包含这个值。Some也是Option的子类。
         通过模式匹配分离可选值,如果匹配的值是Some的话,将Some里的值抽出赋给x变量。举一个综合的例子:

def showCapital(x: Option[String]) = x match {
    case Some(s) => s
    case None => "?"
}

/*
Option用法:Scala推荐使用Option类型来代表一些可选值。使用Option类型,读者一眼就可以看出这种类型的值可能为None。
如上面:x: Option[String])参数,就是因为参数可能是String,也可能为null,这样程序不会在为null时抛出异常
*/

Spark中,经常使用在map中使用case语句进行匹配None和Some,再举一个例子

//假设graph.Vertice:(id,(name,weight))如下:
//(4,(David,Some(2)))(3,(Charlie,Some(2)))(6,(Fran,Some(4)))(2,(Bob,Some(4)))(1,(Alice,Some(4)))(5,(Ed,None))
//id=5时,weight=None,其他的为Some

val weights=graph.vertices.map{
      case (id,(name,Some(weight)))=>(id,weight)
      case (id,(name,None))=>(id,0)
    }    
weights.foreach(print)
println

//输出结果如下(id,weight):
//(3,2)(6,4)(2,4)(4,2)(1,4)(5,0)

在上面的例子中,其实我们也可以选用另外一个方法,getOrElse。这个方法在这个Option是Some的实例时返回对应的值,而在是None的实例时返函数参数。
上面例子可以用下面的语句获得同样的结果:

val weights=graph.vertices.map{
      attr=>(attr._1,attr._2._2.getOrElse(0))
      //如果attr._2._2!=None,返回attr._2._2(weight)的值,
      //否则(即attr._2._2==None),返回自己设置的函数参数(0)
    }

//输出同样的结果:
//(id,weight)
(4,2)(6,4)(2,4)(3,2)(1,4)(5,0)

图算法工具包

1.数三角形

TriangleCount主要用途之一是用于社区发现,如下图所示:
这里写图片描述
例如说在微博上你关注的人也互相关注,大家的关注关系中就会有很多三角形,这说明社区很强很稳定,大家的联系都比较紧密;如果说只是你一个人关注很多人,这说明你的社交群体是非常小的。(摘自《大数据Spark企业级实战》一书)

graph.triangleCount().vertices.foreach(x=>print(x+"\\n"))
    /*输出结果
     * (1,1)//顶点1有1个三角形
     * (3,2)//顶点3有2个三角形
     * (5,2)
     * (4,1)
     * (6,1)
     * (2,2)
     */

2.连通图

        现实生活中存在各种各样的网络,诸如人际关系网、交易网、运输网等等。对这些网络进行社区发现具有极大的意义,如在人际关系网中,可以发现出具有不同兴趣、背景的社会团体,方便进行不同的宣传策略;在交易网中,不同的社区代表不同购买力的客户群体,方便运营为他

以上是关于Spark GraphX学习笔记的主要内容,如果未能解决你的问题,请参考以下文章

GraphX学习笔记——Programming Guide

大数据技术之_19_Spark学习_05_Spark GraphX 应用解析 + Spark GraphX 概述解析 + 计算模式 + Pregel API + 图算法参考代码 + PageRank(

GraphX 实现K-Core

Spark学习笔记

大数据技术之_19_Spark学习_05_Spark GraphX 应用解析小结

Spark GraphX图计算代码实现,源码分析