Spark项目---利用GraphX解决飞机航班问题
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark项目---利用GraphX解决飞机航班问题相关的知识,希望对你有一定的参考价值。
【知识前提】
pregel:是 Google 自 2009 年开始对外公开的图计算算法和系统, 主要用于解决无法在单机环境下计算的大规模图论计算问题**,多用于解决最短路径等问题
pregel源码
/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new
* value for the vertex. The `sendMsg` function is then invoked on
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* associative function used to combine messages destined to the
* same vertex.
*
* On the first iteration all vertices receive the `initialMsg` and
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
*
* This function iterates until there are no remaining messages, or
* for `maxIterations` iterations.
*
* @tparam A the Pregel message type
*
* @param initialMsg the message each vertex will receive at the on
* the first iteration
*
* @param maxIterations the maximum number of iterations to run for
*
* @param activeDirection the direction of edges incident to a vertex that received a message in
* the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
* out-edges of vertices that received a message in the previous round will run.
*
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
* all vertices and is passed the default message. On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* @param sendMsg a user supplied function that is applied to out
* edges of vertices that received messages in the current
* iteration
*
* @param mergeMsg a user supplied function that takes two incoming
* messages of type A and merges them into a single message of type
* A. ''This function must be commutative and associative and
* ideally the size of A should not increase.''
*
* @return the resulting graph at the end of the computation
*
*/
def pregel[A: ClassTag](
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
}
pregel函数中的参数
- initialMsg 图中的节点初始化
- maxIterations 最大迭代次数
- activeDirection 规定了发送消息的方向
- vprog 节点调用该消息将聚合后的数据和本节点进行属性的合并
- sendMsg 激活态的节点调用该方法发送消息
- mergeMsg 如果一个节点接收到多条消息,先用mergeMsg 来将多条消息聚合成为一条消息,如果节点只收到一条消息,则不调用该函数
需求分析
- 统计航班飞行网图中机场的数量
- 统计航班飞行网图中航线的数量
- 计算最长的飞行航线
- 找出最繁忙的机场
- 找出最重要的飞行航线
- 找出最便宜的飞行航线
步骤:
装载CSV为RDD,每个机场作为顶点。关键字段:起飞机场编号、起飞机场、到达机场编号、到达机场、飞行距离
初始化顶点集airports:RDD[(VertexId,String)],顶点属性为机场名称
初始化边集lines:RDD[Edge],边属性为飞行距离
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
/*
* 图:GraphX(Vertex,Edge)
* Vertex:节点
* Edge:边
* */
object FlightDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("ETL")
val sc = SparkContext.getOrCreate(conf)
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val rdd = sc.textFile("in/flight.csv")
// rdd.foreach(println)
//定义节点
val airPort:RDD[(VertexId,String)] = rdd.map(_.split(","))
.flatMap(x => Array((x(5).toLong, x(6)), (x(7).toLong, x(8)))) //注意 RDD第一个参数必须是VertexId类型,而VertexId必须是Long类型,因为源码就这么规定的
.distinct()
//定义边
val lines:RDD[Edge[PartitionID]] = rdd.map(_.split(",")) //(起飞机场编号,降落机场编号,飞行距离)
.map(x => (x(5).toLong, x(7).toLong, x(16).toInt))
.distinct()
.map(x => Edge(x._1, x._2, x._3))
val graph = Graph(airPort,lines) //创建图
// graph.triplets.foreach(println)
// println("机场个数"+graph.numVertices) //1.机场个数
// println("航线个数"+graph.numEdges) //2.航线个数
//3.航线最长的距离
// graph.triplets.sortBy(_.attr,false).take(3).foreach(println)
//4.最繁忙的机场
// graph.inDegrees.sortBy(_._2,false).take(1).foreach(println) //入度
// graph.degrees.sortBy(_._2,false).take(3).foreach(println) //出度
//5.最重要的飞行航线
// pageRank 函数
// graph.pageRank(0.05).edges.sortBy(_.attr,false).take(3).foreach(println) //误差:两次迭代误差大于给定的值会继续执行
//最终要的飞机站点
// graph.pageRank(0.05).vertices.sortBy(_._2,false).take(3).foreach(println)
//6.求某点到所有点的最便宜航线
// price = 180 + distance * 0.15
// pregel 函数求最短路径问题
val value = airPort.sample(false,1.0/airPort.count())
val srcAriportName = value.first()._2
val srcAirportID = value.first()._1
//1.初始化 点(默认0.0),边(距离换成价格)
val initGraph = graph.mapVertices((id, value) => {
if (id == srcAirportID) 0.0 else Double.PositiveInfinity
}).mapEdges(e=>180 + 0.15*e.attr.toDouble)
val pregGraph = initGraph.pregel(
Double.PositiveInfinity, //初始值
Int.MaxValue, //最大迭代次数
EdgeDirection.Out //发送方向
)(
(id: Long, dist: Double, newDist: Double) => math.min(dist, newDist), //将接受到的消息和本节点消息合并
triple=>{
if(triple.attr+triple.srcAttr<triple.dstAttr) //激活的节点调用该方法传递消息
Iterator((triple.dstId,triple.attr+triple.srcAttr))
else
Iterator.empty
}
,
(dist: Double, newDist: Double) => math.min(dist, newDist) //如果一个节点接收到多条消息,先用mergeMsg 来将多条消息聚合成为一条消息,如果节点只收到一条消息,则不调用该函数
)
// pregGraph.triplets.foreach(println)
//最贵的三个
pregGraph.vertices.sortBy(_._2,false).take(3).foreach(println)
//最便宜的
pregGraph.vertices.sortBy(_._2).take(1).foreach(println)
}
}
以上是关于Spark项目---利用GraphX解决飞机航班问题的主要内容,如果未能解决你的问题,请参考以下文章
想问一下图数据库neo4j和spark下面的graphx有啥区别