Spark项目---利用GraphX解决飞机航班问题

Posted Shall潇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark项目---利用GraphX解决飞机航班问题相关的知识,希望对你有一定的参考价值。

【知识前提】

pregel:是 Google 自 2009 年开始对外公开的图计算算法和系统, 主要用于解决无法在单机环境下计算的大规模图论计算问题**,多用于解决最短路径等问题

pregel源码

/**
   * Execute a Pregel-like iterative vertex-parallel abstraction.  The
   * user-defined vertex-program `vprog` is executed in parallel on
   * each vertex receiving any inbound messages and computing a new
   * value for the vertex.  The `sendMsg` function is then invoked on
   * all out-edges and is used to compute an optional message to the
   * destination vertex. The `mergeMsg` function is a commutative
   * associative function used to combine messages destined to the
   * same vertex.
   *
   * On the first iteration all vertices receive the `initialMsg` and
   * on subsequent iterations if a vertex does not receive a message
   * then the vertex-program is not invoked.
   *
   * This function iterates until there are no remaining messages, or
   * for `maxIterations` iterations.
   *
   * @tparam A the Pregel message type
   *
   * @param initialMsg the message each vertex will receive at the on
   * the first iteration
   *
   * @param maxIterations the maximum number of iterations to run for
   *
   * @param activeDirection the direction of edges incident to a vertex that received a message in
   * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
   * out-edges of vertices that received a message in the previous round will run.
   *
   * @param vprog the user-defined vertex program which runs on each
   * vertex and receives the inbound message and computes a new vertex
   * value.  On the first iteration the vertex program is invoked on
   * all vertices and is passed the default message.  On subsequent
   * iterations the vertex program is only invoked on those vertices
   * that receive messages.
   *
   * @param sendMsg a user supplied function that is applied to out
   * edges of vertices that received messages in the current
   * iteration
   *
   * @param mergeMsg a user supplied function that takes two incoming
   * messages of type A and merges them into a single message of type
   * A.  ''This function must be commutative and associative and
   * ideally the size of A should not increase.''
   *
   * @return the resulting graph at the end of the computation
   *
   */
  def pregel[A: ClassTag](
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,
      activeDirection: EdgeDirection = EdgeDirection.Either)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
  }

pregel函数中的参数

  • initialMsg 图中的节点初始化
  • maxIterations 最大迭代次数
  • activeDirection 规定了发送消息的方向
  • vprog 节点调用该消息将聚合后的数据和本节点进行属性的合并
  • sendMsg 激活态的节点调用该方法发送消息
  • mergeMsg 如果一个节点接收到多条消息,先用mergeMsg 来将多条消息聚合成为一条消息,如果节点只收到一条消息,则不调用该函数

需求分析

  1. 统计航班飞行网图中机场的数量
  2. 统计航班飞行网图中航线的数量
  3. 计算最长的飞行航线
  4. 找出最繁忙的机场
  5. 找出最重要的飞行航线
  6. 找出最便宜的飞行航线

步骤:
装载CSV为RDD,每个机场作为顶点。关键字段:起飞机场编号、起飞机场、到达机场编号、到达机场、飞行距离
初始化顶点集airports:RDD[(VertexId,String)],顶点属性为机场名称
初始化边集lines:RDD[Edge],边属性为飞行距离

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

/*
* 图:GraphX(Vertex,Edge)
* Vertex:节点
* Edge:边
* */

object FlightDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("ETL")
    val sc = SparkContext.getOrCreate(conf)
    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._

    val rdd = sc.textFile("in/flight.csv")
//    rdd.foreach(println)

	//定义节点
    val airPort:RDD[(VertexId,String)] = rdd.map(_.split(","))
      .flatMap(x => Array((x(5).toLong, x(6)), (x(7).toLong, x(8)))) //注意 RDD第一个参数必须是VertexId类型,而VertexId必须是Long类型,因为源码就这么规定的
    .distinct()
    
	//定义边
    val lines:RDD[Edge[PartitionID]] = rdd.map(_.split(",")) //(起飞机场编号,降落机场编号,飞行距离)
      .map(x => (x(5).toLong, x(7).toLong, x(16).toInt))
      .distinct()
      .map(x => Edge(x._1, x._2, x._3))

    val graph = Graph(airPort,lines)          //创建图
//    graph.triplets.foreach(println)

//    println("机场个数"+graph.numVertices)   //1.机场个数
//    println("航线个数"+graph.numEdges)      //2.航线个数

    //3.航线最长的距离
//    graph.triplets.sortBy(_.attr,false).take(3).foreach(println)

    //4.最繁忙的机场
//    graph.inDegrees.sortBy(_._2,false).take(1).foreach(println)  //入度

//    graph.degrees.sortBy(_._2,false).take(3).foreach(println)    //出度

    //5.最重要的飞行航线
    // pageRank 函数
//    graph.pageRank(0.05).edges.sortBy(_.attr,false).take(3).foreach(println)  //误差:两次迭代误差大于给定的值会继续执行

    //最终要的飞机站点
//    graph.pageRank(0.05).vertices.sortBy(_._2,false).take(3).foreach(println)

    //6.求某点到所有点的最便宜航线
    
    // price = 180 + distance * 0.15
    // pregel 函数求最短路径问题

    val value = airPort.sample(false,1.0/airPort.count())
    val srcAriportName = value.first()._2
    val srcAirportID = value.first()._1

    //1.初始化 点(默认0.0),边(距离换成价格)
    val initGraph = graph.mapVertices((id, value) => {
      if (id == srcAirportID) 0.0 else Double.PositiveInfinity
    }).mapEdges(e=>180 + 0.15*e.attr.toDouble)


    val pregGraph = initGraph.pregel(
      Double.PositiveInfinity,      //初始值
      Int.MaxValue,                 //最大迭代次数
      EdgeDirection.Out             //发送方向
    )(
      (id: Long, dist: Double, newDist: Double) => math.min(dist, newDist), //将接受到的消息和本节点消息合并
      triple=>{
        if(triple.attr+triple.srcAttr<triple.dstAttr)                       //激活的节点调用该方法传递消息
          Iterator((triple.dstId,triple.attr+triple.srcAttr))
        else
          Iterator.empty
      }
      ,
      (dist: Double, newDist: Double) => math.min(dist, newDist) //如果一个节点接收到多条消息,先用mergeMsg 来将多条消息聚合成为一条消息,如果节点只收到一条消息,则不调用该函数
    )
//    pregGraph.triplets.foreach(println)

	//最贵的三个
	pregGraph.vertices.sortBy(_._2,false).take(3).foreach(println)
	//最便宜的
	pregGraph.vertices.sortBy(_._2).take(1).foreach(println)
  }
}

以上是关于Spark项目---利用GraphX解决飞机航班问题的主要内容,如果未能解决你的问题,请参考以下文章

想问一下图数据库neo4j和spark下面的graphx有啥区别

运行spark官方的graphx 示例 ComprehensiveExample.scala报错解决

Spark中文手册8:spark GraphX编程指南

Spark全栈数据分析之飞机飞行记录

Spark GraphX学习笔记

graphX内存不够的解决方法