Spark GraphX企业运用

Posted huanghanyu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark GraphX企业运用相关的知识,希望对你有一定的参考价值。

========== Spark GraphX 概述 ==========
1、Spark GraphX是什么?
  (1)Spark GraphX 是 Spark 的一个模块,主要用于进行以图为核心的计算还有分布式图的计算。
  (2)GraphX 他的底层计算也是 RDD 计算,它和 RDD 共用一种存储形态,在展示形态上可以以数据集来表示,也可以图的形式来表示。

2、Spark GraphX 有哪些抽象?
(1)顶点。
  顶点的表示用 RDD[(VertexId, VD)] 来表示,(VertexId, VD) 这个元组用来具体表示一个顶点,VertexID 表示顶点的 ID,是 Long 类型的别名,VD 是顶点的属性,是一个类型参数,可以是任何类型。
(2)边。
  边的表示用 RDD[Edge[ED]] 来表示,Edge 用来具体表示一个边,Edge 里面包含一个 ED 类型参数来设定的属性,ED 类型中包括 一个源顶点的 ID 和一个目标顶点的 ID。
(3)三元组。
  三元组结构用 RDD[EdgeTriplet[VD, ED]] 来表示,EdgeTriplet[VD, ED] 来表示一个三元组,三元组包含了一个边、边的属性、源顶点 ID、源顶点属性、目标顶点 ID、目标顶点属性。VD 和 ED 是类型参数,VD 表示顶点的属性,ED 表示边的属性。
(4)图。
  图在 Spark 中用 Graph[VD, ED] 来表示,可以通过顶点和边来构建。

========== Spark GraphX 图的构建 ==========
1、对于 Vertex 顶点的构建:
(1)对于 RDD[(VertexId, VD)] 这种版本:

val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin""student")), (7L, ("jgonzal""postdoc")),(5L, ("franklin""prof")), (2L, ("istoica""prof"))))

(2)对于 VertexRDD[VD] 这种版本:是顶点的构建的优化版本。说白了,就是对上面版本的包装,包装中进行了一些优化!

val users1: VertexRDD[(String, String)] = VertexRDD[(String, String)](users)

2、对于 Edge 边的构建:
(1)对于 RDD[Edge[ED]] 这种版本:

val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L7L"collab"), Edge(5L3L"advisor"), Edge(2L5L"colleague"), Edge(5L7L"pi")))

(2)对于 EdgeRDD[ED] 这种版本:是边的构建的优化版本。说白了,就是对上面版本的包装,包装中进行了一些优化!

val relationships1: EdgeRDD[String] = EdgeRDD.fromEdges(relationships)

3、对于 Graph 图的构建:
Graph[VD: ClassTag, ED: ClassTag]
(1)通过 Graph 类的 apply 方法来构建。

val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin""student")), (7L, ("jgonzal""postdoc")),(5L, ("franklin""prof")), (2L, ("istoica""prof"))))
val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L7L"collab"), Edge(5L3L"advisor"), Edge(2L5L"colleague"), Edge(5L7L"pi")))
val defaultUser = ("John Doe""Missing")
val graph = Graph(users, relationships) 

def apply[VD: ClassTag, ED: ClassTag](
    vertices: RDD[(VertexId, VD)],
    edges: RDD[Edge[ED]],
    defaultVertexAttr: VD = null.asInstanceOf[VD],
    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

测试代码:

scala> val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin""student")), (7L, ("jgonzal""postdoc")),(5L, ("franklin""prof")), (2L, ("istoica""prof"))))
users: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, (String, String))] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L7L"collab"), Edge(5L3L"advisor"), Edge(2L5L"colleague"), Edge(5L7L"pi")))
relationships: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = ParallelCollectionRDD[1] at parallelize at <console>:26

scala> val defaultUser = ("John Doe""Missing")
defaultUser: (String, String) = (John Doe,Missing)

scala> val graph = Graph(users, relationships)
graph: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@4285b1bd

scala> graph.vertices.collect.foreach(println _)
(5,(franklin,prof))                                                             
(2,(istoica,prof))
(3,(rxin,student))
(7,(jgonzal,postdoc))

scala> graph.edges.collect.foreach(println _)
Edge(3,7,collab)
Edge(5,3,advisor)
Edge(2,5,colleague)
Edge(5,7,pi)

(2)通过 Graph 类提供 fromEdges 方法来构建。注意:对于顶点的属性是使用提供的默认属性。

val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L7L"collab"), Edge(5L3L"advisor"), Edge(2L5L"colleague"), Edge(5L7L"pi")))
val defaultUser = ("aaa""bbb")
val graph2 = Graph.fromEdges(relationships, defaultUser)

def fromEdges[VD: ClassTag, ED: ClassTag](
    edges: RDD[Edge[ED]],
    defaultValue: VD,
    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

测试代码:

scala> val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L7L"collab"), Edge(5L3L"advisor"), Edge(2L5L"colleague"), Edge(5L7L"pi")))
relationships: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = ParallelCollectionRDD[15] at parallelize at <console>:26

scala> val defaultUser = ("aaa""bbb")
defaultUser: (String, String) = (aaa,bbb)

scala> val graph2 = Graph.fromEdges(relationships, defaultUser)
graph2: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@52fb37d7

scala> graph2.vertices.collect.foreach(println _)
(5,(aaa,bbb))
(2,(aaa,bbb))
(3,(aaa,bbb))
(7,(aaa,bbb))

scala> graph2.edges.collect.foreach(println _)
Edge(3,7,collab)
Edge(5,3,advisor)
Edge(2,5,colleague)
Edge(5,7,pi)

(3)通过 Graph 类提供的 fromEdgeTuples 方法来构建。注意:对于顶点的属性是使用提供的默认属性,对于边的属性是相同边的数量。

val relationships: RDD[(VertexId, VertexId)] = sc.parallelize(Array((3L7L), (5L3L), (2L5L), (5L7L)))
val defaultUser = ("haha""heihei")
val graph3 = Graph.fromEdgeTuples[(String, String)](relationships, defaultUser)

def fromEdgeTuples[VD: ClassTag](
    rawEdges: RDD[(VertexId, VertexId)],
    defaultValue: VD,
    uniqueEdges: Option[PartitionStrategy] = None,
    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int]

测试代码:

scala> val relationships: RDD[(VertexId, VertexId)] = sc.parallelize(Array((3L7L), (5L3L), (2L5L), (5L7L)))
relationships: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId)] = ParallelCollectionRDD[26] at parallelize at <console>:26

scala> val defaultUser = ("haha""heihei")
defaultUser: (String, String) = (haha,heihei)

scala> val graph3 = Graph.fromEdgeTuples[(String, String)](relationships, defaultUser)
graph3: org.apache.spark.graphx.Graph[(String, String),Int] = org.apache.spark.graphx.impl.GraphImpl@5cb7311b

scala> graph3.vertices.collect.foreach(println _)
(5,(haha,heihei))
(2,(haha,heihei))
(3,(haha,heihei))
(7,(haha,heihei))

scala> graph3.edges.collect.foreach(println _)
Edge(3,7,1)     第三个元素“1”表示的是相同边的数量
Edge(5,3,1)
Edge(2,5,1)
Edge(5,7,1)

========== Spark GraphX 图的基本信息转换 ==========
1、graph.numEdges 返回当前图的边的数量
2、graph.numVertices 返回当前图的顶点的数量
3、graph.inDegrees 返回当前图每个顶点入度的数量,返回类型为 VertexRDD[Int]
4、graph.outDegrees 返回当前图每个顶点出度的数量,返回的类型为 VertexRDD[Int]
5、graph.degrees 返回当前图每个顶点入度和出度的和,返回的类型为 VertexRDD[Int]

========== Spark GraphX 图的转换操作 ==========
1、def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2) (implicit eq: VD =:= VD2 = null): Graph[VD2, ED]
  对当前图每一个顶点应用提供的 map 函数来修改顶点的属性,返回一个新的图。
2、def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
  对当前图每一条边应用提供的 map 函数来修改边的属性,返回一个新图。
3、def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  对当前图每一个三元组应用提供的 map 函数来修改边的属性,返回一个新图。

========== Spark GraphX 图的结构操作 ==========
1、def reverse: Graph[VD, ED]
  该操作反转一个图,产生一个新图,新图中的每条边的方向和原图每条边的方向相反。
2、def subgraph(epred: EdgeTriplet[VD, ED] => Boolean = (x => true), vpred: (VertexId, VD) => Boolean = ((v, d) => true)) : Graph[VD, ED]
  该操作返回一个当前图的子图,通过传入 epred 函数来过滤边,通过传入 vpred 函数来过滤顶点,返回满足 epred 函数值为 true 的边和满足 vpred 函数值为 true 顶点组成子图。
3、def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]
  mask 函数用于求一张图和 other 这张图的交集,该交集的判别条件指的是:1、对于顶点,只对比顶点的 ID。2、对于边,只对比边的 srcID、dstID,如果 other 和当前图的交集中的边、顶点的属性不一致,那么 mask 产生的图默认采用当前图的属性。
4、def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  该操作实现将当前图中的两条相同边(边的 srcID 和 dstID 相同)合并。你需要传入一个 merge 函数,用于合并这两边的属性返回一个新的属性。注意:合并两条边的前提是,两条边在一个分区。

========== Spark GraphX 顶点关联操作 ==========
1、def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  该操作通过 mapFunc 函数将 table 中提供的数据更新到相同 VertexId 的属性里。
2、def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null): Graph[VD2, ED]
  该操作和 joinVertices 提供了相同的功能,但是,如果 table 中不存在相对应的顶点(也就是不存 VertexId),这个时候 U 默认是 None。

========== Spark GraphX 聚合操作 ==========
1、def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  该操作返回 EdgeDirection 定义的方向中相邻顶点的 ID 和属性的集合。
2、def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  改操作返回 EdgeDirection 定义的方向中相邻顶点的 ID 的集合。
3、def aggregateMessages[A: ClassTag](sendMsg: EdgeContext[VD, ED, A] => Unit,mergeMsg: (A, A) => A,tripletFields: TripletFields = TripletFields.All): VertexRDD[A]
  该函数用于聚合发送到顶点的信息,A 是发送的信息的类型,sendMsg 是每一条边都会自动触发,到底有没有消息能够发送到顶点,使用 EdgeContext 里面的 sendToSrc和sendToDst 来实现。mergeMsg
是每一个顶点都会在接受到所有消息之后调用,主要用于所有接收到的消息的聚合。然后整个函数返回消息的顶点集合 VertexRDD[A]。

以上是关于Spark GraphX企业运用的主要内容,如果未能解决你的问题,请参考以下文章

GraphX 实现K-Core

Spark GraphX图计算代码实现,源码分析

Spark中GraphX图运算pregel详解

大数据技术之_19_Spark学习_05_Spark GraphX 应用解析 + Spark GraphX 概述解析 + 计算模式 + Pregel API + 图算法参考代码 + PageRank(

Apache Spark Graphx - Java 实现

Spark GraphX实例