Apache Spark 中的分层数据操作
Posted
技术标签:
【中文标题】Apache Spark 中的分层数据操作【英文标题】:Hierarchical data manipulation in Apache Spark 【发布时间】:2017-12-27 17:10:24 【问题描述】:我在 Spark (v2.1.1) 中有一个包含分层数据的 3 列(如下所示)的数据集。
我的目标是根据父子层次结构为每一行分配递增编号。从图形上可以说,分层数据是树的集合。 根据下表,我已经根据“Global_ID”对行进行了分组。现在我想在 增量订单,但基于来自的数据层次结构 “父”和“子”列。
表格表示(值是所需的输出):
+-----------+--------+-------+ +-----------+--------+-------+-------+
| Current Dataset | | Desired Dataset (Output) |
+-----------+--------+-------+ +-----------+--------+-------+-------+
| Global_ID | Parent | Child | | Global_ID | Parent | Child | Value |
+-----------+--------+-------+ +-----------+--------+-------+-------+
| 111 | 111 | 123 | | 111 | 111 | 111 | 1 |
| 111 | 135 | 246 | | 111 | 111 | 123 | 2 |
| 111 | 123 | 456 | | 111 | 123 | 789 | 3 |
| 111 | 123 | 789 | | 111 | 123 | 456 | 4 |
| 111 | 111 | 111 | | 111 | 111 | 135 | 5 |
| 111 | 135 | 468 | | 111 | 135 | 246 | 6 |
| 111 | 135 | 268 | | 111 | 135 | 468 | 7 |
| 111 | 268 | 321 | | 111 | 135 | 268 | 8 |
| 111 | 138 | 139 | | 111 | 268 | 321 | 9 |
| 111 | 111 | 135 | | 111 | 111 | 138 | 10 |
| 111 | 111 | 138 | | 111 | 138 | 139 | 11 |
| 222 | 222 | 654 | | 222 | 222 | 222 | 12 |
| 222 | 654 | 721 | | 222 | 222 | 987 | 13 |
| 222 | 222 | 222 | | 222 | 222 | 654 | 14 |
| 222 | 721 | 127 | | 222 | 654 | 721 | 15 |
| 222 | 222 | 987 | | 222 | 721 | 127 | 16 |
| 333 | 333 | 398 | | 333 | 333 | 333 | 17 |
| 333 | 333 | 498 | | 333 | 333 | 398 | 18 |
| 333 | 333 | 333 | | 333 | 333 | 498 | 19 |
| 333 | 333 | 598 | | 333 | 333 | 598 | 20 |
+-----------+--------+-------+ +-----------+--------+-------+-------+
树表示(期望值在每个节点旁边表示):
+-----+ +-----+
1 | 111 | 17 | 333 |
+--+--+ +--+--+
| |
+---------------+--------+-----------------+ +----------+----------+
| | | | | |
+--v--+ +--v--+ +--v--+ +--v--+ +--v--+ +--v--+
2 | 123 | 5 | 135 | 10 | 138 | | 398 | | 498 | | 598 |
+--+--+ +--+--+ +--+--+ +--+--+ +--+--+ +--+--+
+-----+-----+ +--------+--------+ | 18 19 20
| | | | | |
+--v--+ +--v--+ +--v--+ +--v--+ +--v--+ +--v--+
| 789 | | 456 | | 246 | | 468 | | 268 | | 139 | +-----+
+-----+ +-----+ +-----+ +-----+ +--+--+ +-----+ 12 | 222 |
3 4 6 7 8 | 11 +--+--+
+--v--+ |
| 321 | +------+-------+
+--+--+ | |
9 +--v--+ +--v--+
13 | 987 | 14 | 654 |
+--+--+ +--+--+
|
+--v--+
15 | 721 |
+--+--+
|
+--v--+
16 | 127 |
+--+--+
代码片段:
Dataset<Row> myDataset = spark
.sql("select Global_ID, Parent, Child from RECORDS");
JavaPairRDD<Row,Long> finalDataset = myDataset.groupBy(new Column("Global_ID"))
.agg(functions.sort_array(functions.collect_list(new Column("Parent").as("parent_col"))),
functions.sort_array(functions.collect_list(new Column("Child").as("child_col"))))
.orderBy(new Column("Global_ID"))
.withColumn("vars", functions.explode(<Spark UDF>)
.select(new Column("vars"),new Column("parent_col"),new Column("child_col"))
.javaRDD().zipWithIndex();
// Sample UDF (TODO: Actual Implementation)
spark.udf().register("computeValue",
(<Column Names>) -> <functionality & implementation>,
DataTypes.<xxx>);
经过大量研究和博客中的许多建议,我尝试了以下方法,但无济于事。
技术栈:
Apache Spark (v2.1.1)
Java 8
AWS EMR 集群(Spark 应用部署)
数据量:
数据集中大约有 2000 万行
尝试的方法:
Spark GraphX + GraphFrames:
使用这种组合,我只能实现顶点和边之间的关系,但它不适合我的用例。 参考:https://graphframes.github.io/user-guide.htmlSpark GraphX Pregel API:
这是我可以达到的最接近预期结果的方法,但不幸的是我找不到相同的 Java 代码 sn-p。 其中一篇博客中提供的示例是在 Scala 中,但我不是 精通。 参考:https://dzone.com/articles/processing-hierarchical-data-using-spark-graphx-pr
对当前方法中的替代(或)修改的任何建议都会非常有帮助,因为我完全迷失在为这个用例找出解决方案的过程中。
感谢您的帮助!谢谢!
【问题讨论】:
【参考方案1】:注意:以下解决方案在 scala spark 中。您可以轻松翻译成 java 代码。
看看这个。我尝试使用 Spark Sql 进行操作,您可以了解一下。基本上想法是在聚合和分组它们时对子、父和 globalid 进行排序。一旦按 globalid 分组和排序,就展开其余的。您将获得有序的结果表,稍后您可以zipWithIndex
添加排名(值)
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val t = Seq((111,111,123), (111,111,111), (111,123,789), (111,268,321), (222,222,654), (222,222,222), (222,721,127), (333,333,398), (333,333,333), (333,333,598))
val ddd = sc.parallelize(t).toDF
val zip = udf((xs: Seq[Int], ys: Seq[Int]) => xs zip ys)
val dd1 = ddd
.groupBy($"_1")
.agg(sort_array(collect_list($"_2")).as("v"),
sort_array(collect_list($"_3")).as("w"))
.orderBy(asc("_1"))
.withColumn("vars", explode(zip($"v", $"w")))
.select($"_1", $"vars._1", $"vars._2").rdd.zipWithIndex
dd1.collect
输出
res24: Array[(org.apache.spark.sql.Row, Long)] = Array(([111,111,111],0), ([111,111,123],1), ([111,123,321],2),
([111,268,789],3), ([222,222,127],4), ([222,222,222],5), ([222,721,654],6),([333,333,333],7), ([333,333,398],8), ([333,333,598],9))
【讨论】:
这似乎是一个非常可行的解决方案。谢谢你的代码!我肯定会尝试一下,但只是有点担心“收集为列表”,因为我的数据大约有 2000 万行,但它应该很好。如果您能提供相同 Scala 代码的 Java 等价物,那就太好了,因为我是 Scala 的新手。再次感谢! @Sridher 我认为您可以轻松地将其转换为 java 代码。这里要注意的主要是您可以在 java 中复制的 spark 代码。 我将大部分代码转换为其 java 等效代码,但在您使用的 Spark UDF 方面遇到了一些问题。你能帮我解决一下UDF吗?请参阅我编辑的帖子以获取代码 sn-p。 ***.com/questions/39735864/… 这可能会有所帮助 @Sridher 你能做到吗?以上是关于Apache Spark 中的分层数据操作的主要内容,如果未能解决你的问题,请参考以下文章
Spark-TFRecord:Apache Spark与TensorFlow TFRecord互操作示例