Spark 结合了 DataFrames 和 GraphX
Posted
技术标签:
【中文标题】Spark 结合了 DataFrames 和 GraphX【英文标题】:Spark combine DataFrames and GraphX 【发布时间】:2016-09-19 00:10:09 【问题描述】:是否可以将 GraphX 和 DataFrames 结合起来?我希望图中的每个节点都有一个自己的 DataFrame。我知道 GraphX 和 DataFrame 扩展了 RDD 并且嵌套的 RDD 是不可能的,并且 SparkContext 不是可序列化的。但在 Spark 2.0.0 中,我看到 SparkSession 是可序列化的。我已经尝试过了,但它仍然无法正常工作。 我还尝试将 DataFrames 全局存储在一个数组中。但我无法访问工作节点中的数组。忽略 sendMsg 和 merge 方法:
object Main
def main(args: Array[String]) : Unit =
val spark = SparkSession
.builder
.appName("ScalaGraphX_SQL")
.master("spark://home:7077")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
val node_pair : RDD[(Array[String],Long)] = sc.textFile(args(0)).map(l=>l.split(" ")).zipWithIndex()
//set array size
Tables.tables = new Array[Dataset[Row]](node_pair.count().toInt)
//insert dataframe inside array tables
node_pair.collect().foreach case (arr,l) =>
val fields = arr.takeRight(arr.length-2).map(fieldName => StructField(fieldName, BooleanType, nullable = true))
val schema = StructType(fields)
val rows = new util.ArrayList[Row]
Tables.tablesl.toInt = spark.createDataFrame(rows, schema)
//val f =
//create vertices
val vertices : RDD[(VertexId,TreeNode)]= node_pair.map case (arr,l) =>
(l,new TreeNode(l,false))
//create edges
val edges : RDD[Edge[Boolean]] = node_pair
.filter case (arr,l) => arr(0).toLong != -1
.map case (arr,l) => Edge(l,arr(0).toLong,true)
var init_node : TreeNode = new TreeNode(-1,false)
val graph = Graph(vertices,edges,init_node)
val graph_pregel = Pregel(graph,init_node,Int.MaxValue,EdgeDirection.Out)(vProg,sendMsg,merge)
graph_pregel.vertices.collect().foreach(v => println(v._2.index))
def vProg(id:VertexId, act: TreeNode, other: TreeNode): TreeNode =
println(Tables.tablesact.index.toInt)
act
def sendMsg(et : EdgeTriplet[TreeNode,Boolean]) : Iterator[(VertexId, TreeNode)] =
if(et.srcAttr.v)
println(et.srcId + "--->" + et.dstId)
Iterator((et.dstId,et.srcAttr))
else
//println(et.srcId + "-/->" + et.dstId)
Iterator.empty
def merge(n1:TreeNode, n2:TreeNode): TreeNode =
n1
object Tables extends Serializable
var tables : scala.Array[Dataset[Row]] = null
class TreeNode(val index:Long, var v: Boolean) extends Serializable
也许有可能使用 RDD 访问全局数组?或者有人对此问题有其他解决方案?
【问题讨论】:
问题不是,也从来不是序列化。不可序列化在这里只是一个提示,它指出了 Spark 架构不适合嵌套处理而不显着限制编程模型的主要问题。所以仅仅因为你可以序列化SparkSession
(你可以在1.x中以同样的方式序列化SQLContext
)并不意味着任何改变。
【参考方案1】:
请查看GraphFrames - 这是一个为 GraphX 提供 DataFrame API 的包。一旦 GraphFrames 提供了在 GraphX 中很重要的分区等功能,并且当对 API 进行了更详尽的测试时,就会考虑将其包含在 Spark 中。
对于下面评论中描述的问题,您有一个带有节点的 DataFrame,即 Airports:
val airports = sqlContext.createDataFrame(List(
("A1", "Wrocław"),
("A2", "London"),
("A3", "NYC")
)).toDF("id", "name")
ID 是唯一的。您可以创建其他 DataFrame,即 detailsDF,其结构如下:
ID | AirPortID | other data
。然后你有一对多和一个机场(所以 GraphFrame verticle)你有很多 detailsDF 条目。现在可以查询:
spark.sql("select a.name, d.id as detailID from airports a join detailsDF d on a.id = d.airportID");
。如果您想在其中存储其他信息,您还可以在 Airports DataFrame 中包含许多列
【讨论】:
谢谢,但不是 GraphFrames,Graphs 被构造为 DataFrames 吗?我需要一个在节点内带有 DataFrames 的图表。它就像每个节点的表。还是我误解了 GraphFrames? 是和不是 :) 在 GraphFrames 中,每个节点都有一个表(DataFrame)。然而,这个节点可以有一些 ID,然后可以有另一个 DataFrame,即 NodeDetails,它将有列“baseNodeId”。那么一个节点可以有很多行 对不起,我不明白。那不是嵌套的DataFrame吗?你能给我一个简短的例子吗?非常感谢! @VitaliD。我已经更新了答案-如果现在清楚了,请告诉我:)以上是关于Spark 结合了 DataFrames 和 GraphX的主要内容,如果未能解决你的问题,请参考以下文章
大数据(spark sql 和 spark dataframes 连接)
Spark PairRDDs 和 DataFrames 是不是被索引?
在 Spark RDD 和/或 Spark DataFrames 中重塑/透视数据
在 Spark RDD 和/或 Spark DataFrames 中重塑/透视数据