Spark GraphX图计算代码实现,源码分析

Posted yszd

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark GraphX图计算代码实现,源码分析相关的知识,希望对你有一定的参考价值。

一.简介

  参考:https://www.cnblogs.com/yszd/p/10186556.html

二.代码实现  

 1 package big.data.analyse.graphx
 2 
 3 import org.apache.log4j.Level, Logger
 4 import org.apache.spark.graphx._
 5 import org.apache.spark.rdd.RDD
 6 import org.apache.spark.sql.SparkSession
 7 
 8 class VertexProperty()
 9 case class UserProperty(val name: String) extends VertexProperty
10 case class ProductProperty(val name: String, val price: Double) extends VertexProperty
11 
12 /*class Graph[VD, ED]
13   val vertices: VertexRDD[VD]
14   val edges: EdgeRDD[ED]
15 */
16 
17 /**
18   * Created by zhen on 2019/10/4.
19   */
20 object GraphXTest 
21   /**
22     * 设置日志级别
23     */
24   Logger.getLogger("org").setLevel(Level.WARN)
25   def main(args: Array[String]) 
26     val spark = SparkSession.builder().appName("GraphXTest").master("local[2]").getOrCreate()
27     val sc = spark.sparkContext
28     /**
29       * 创建vertices的RDD
30       */
31     val users : RDD[(VertexId, (String, String))] = sc.parallelize(
32       Array((3L, ("Spark", "GraphX")), (7L, ("Hadoop", "Java")),
33             (5L, ("HBase", "mysql")), (2L, ("Hive", "Mysql"))))
34 
35     /**
36       * 创建edges的RDD
37       */
38     val relationships: RDD[Edge[String]] = sc.parallelize(
39       Array(Edge(3L, 7L, "Fast"), Edge(5L, 3L, "Relation"),
40       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "colleague")))
41 
42     /**
43       * 定义默认用户
44       */
45     val defualtUser = ("Machical", "Missing")
46 
47     /**
48       * 构建初始化图
49       */
50     val graph = Graph(users, relationships, defualtUser)
51 
52     /**
53       * 使用三元组视图呈现顶点之间关系
54       */
55     val facts : RDD[String] = graph.triplets.map(triplet =>
56       triplet.srcAttr._1 + " is the " + triplet.attr + " with " + triplet.dstAttr._1)
57     facts.collect().foreach(println)
58 
59     graph.vertices.foreach(println) //顶点
60     graph.edges.foreach(println) //
61     graph.ops.degrees.foreach(println) // 各顶点的度
62     graph.triplets.foreach(println) // 顶点,边,关系
63     println(graph.ops.numEdges) // 边的数量
64     println(graph.ops.numVertices) // 顶点的数量
65   
66 

三.结果

  1.三元组视图

    技术图片

  2.顶点

    技术图片

  3.边

    技术图片

  4.各顶点的度

    技术图片

  5.三元组视图

    技术图片

  6.边/顶点数量

    技术图片

四.源码分析

 1 class Graph[VD, ED] 
 2    // Information about the Graph
 3   val numEdges: Long
 4   val numVertices:Long
 5   val inDegrees: VertexRDD[Int]
 6   val outDegrees: VertexRDD[Int]
 7   val degrees: VertexRDD[Int]
 8   
 9    // Views of the graph as collections
10   val vertices: VertexRDD[VD]
11   val edges: EdgeRDD[ED]
12   val triplets: RDD[EdgeTriplet[VD,ED]]
13  
14   //Functions for caching graphs
15   def persist(newLevel1:StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]//默认存储级别为MEMORY_ONLY
16   def cache(): Graph[VD, ED]
17   def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
18 
19   // Change the partitioning heuristic
20   def partitionBy(partitionStrategy: PartitionStrategy)
21 
22   // Transform vertex and edge attributes
23   def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
24   def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
25   def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
26   def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
27   def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2]
28 
29   // Modify the graph structure
30   def reverse: Graph[VD, ED]
31   def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
32   def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] // 返回当前图和其它图的公共子图
33   def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
34 
35   // Join RDDs with the graph  
36   def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
37   def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, Option[U]))
38   
39   // Aggregate information about adjacent triplets
40   def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
41   def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
42   def aggregateMessages[Msg: ClassTag](sendMsg: EdgeContext[VD, ED, Msg] => Unit, merageMsg: (Msg, Msg) => Msg, tripletFields: TripletFields: TripletFields = TripletFields.All): VertexRDD[A]
43   
44   //Iterative graph-parallel computation
45   def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDiection)(vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A): Graph[VD, ED]
46   
47   // Basic graph algorithms
48   def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
49   def connectedComponents(): Graph[VertexId, ED]
50   def triangleCount(): Graph[Int, ED]
51   def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
52 

 

以上是关于Spark GraphX图计算代码实现,源码分析的主要内容,如果未能解决你的问题,请参考以下文章

大数据技术之_19_Spark学习_05_Spark GraphX 应用解析 + Spark GraphX 概述解析 + 计算模式 + Pregel API + 图算法参考代码 + PageRank(

Spark图处理GraphX学习笔记!

Spark项目---利用GraphX解决飞机航班问题

spark graphx图计算

Spark中文手册8:spark GraphX编程指南

经典教程 | 基于Spark GraphX实现微博二度关系推荐