Apache Spark 中的分层数据操作

Posted

技术标签:

【中文标题】Apache Spark 中的分层数据操作【英文标题】:Hierarchical data manipulation in Apache Spark 【发布时间】:2017-12-27 17:10:24 【问题描述】:

我在 Spark (v2.1.1) 中有一个包含分层数据的 3 列(如下所示)的数据集。

我的目标是根据父子层次结构为每一行分配递增编号。从图形上可以说,分层数据是树的集合。 根据下表,我已经根据“Global_ID”对行进行了分组。现在我想在 增量订单,但基于来自的数据层次结构 “父”和“子”列。

表格表示(值是所需的输出):

    +-----------+--------+-------+         +-----------+--------+-------+-------+
    |      Current Dataset       |         |      Desired Dataset (Output)      |
    +-----------+--------+-------+         +-----------+--------+-------+-------+
    | Global_ID | Parent | Child |         | Global_ID | Parent | Child | Value |
    +-----------+--------+-------+         +-----------+--------+-------+-------+
    |       111 |    111 |   123 |         |       111 |    111 |   111 |     1 |
    |       111 |    135 |   246 |         |       111 |    111 |   123 |     2 |
    |       111 |    123 |   456 |         |       111 |    123 |   789 |     3 |
    |       111 |    123 |   789 |         |       111 |    123 |   456 |     4 |
    |       111 |    111 |   111 |         |       111 |    111 |   135 |     5 |
    |       111 |    135 |   468 |         |       111 |    135 |   246 |     6 |
    |       111 |    135 |   268 |         |       111 |    135 |   468 |     7 |
    |       111 |    268 |   321 |         |       111 |    135 |   268 |     8 |
    |       111 |    138 |   139 |         |       111 |    268 |   321 |     9 |
    |       111 |    111 |   135 |         |       111 |    111 |   138 |    10 |
    |       111 |    111 |   138 |         |       111 |    138 |   139 |    11 |
    |       222 |    222 |   654 |         |       222 |    222 |   222 |    12 |
    |       222 |    654 |   721 |         |       222 |    222 |   987 |    13 |
    |       222 |    222 |   222 |         |       222 |    222 |   654 |    14 |
    |       222 |    721 |   127 |         |       222 |    654 |   721 |    15 |
    |       222 |    222 |   987 |         |       222 |    721 |   127 |    16 |
    |       333 |    333 |   398 |         |       333 |    333 |   333 |    17 |
    |       333 |    333 |   498 |         |       333 |    333 |   398 |    18 |
    |       333 |    333 |   333 |         |       333 |    333 |   498 |    19 |
    |       333 |    333 |   598 |         |       333 |    333 |   598 |    20 |
    +-----------+--------+-------+         +-----------+--------+-------+-------+

树表示(期望值在每个节点旁边表示):

                      +-----+                                           +-----+
                   1  | 111 |                                       17  | 333 |
                      +--+--+                                           +--+--+
                         |                                                 |
         +---------------+--------+-----------------+           +----------+----------+
         |                        |                 |           |          |          |
      +--v--+                  +--v--+           +--v--+     +--v--+    +--v--+    +--v--+
   2  | 123 |                5 | 135 |        10 | 138 |     | 398 |    | 498 |    | 598 |
      +--+--+                  +--+--+           +--+--+     +--+--+    +--+--+    +--+--+  
   +-----+-----+         +--------+--------+        |          18         19         20
   |           |         |        |        |        |  
+--v--+     +--v--+   +--v--+  +--v--+  +--v--+  +--v--+ 
| 789 |     | 456 |   | 246 |  | 468 |  | 268 |  | 139 |                 +-----+
+-----+     +-----+   +-----+  +-----+  +--+--+  +-----+             12  | 222 |
   3           4         6        7      8 |        11                   +--+--+
                                        +--v--+                             |
                                        | 321 |                      +------+-------+
                                        +--+--+                      |              |
                                           9                      +--v--+        +--v--+
                                                               13 | 987 |    14  | 654 |
                                                                  +--+--+        +--+--+
                                                                                    |
                                                                                 +--v--+
                                                                             15  | 721 |
                                                                                 +--+--+
                                                                                    |
                                                                                 +--v--+
                                                                             16  | 127 |
                                                                                 +--+--+

代码片段:

Dataset<Row> myDataset = spark
                .sql("select Global_ID, Parent, Child from RECORDS");

JavaPairRDD<Row,Long> finalDataset = myDataset.groupBy(new Column("Global_ID"))
    .agg(functions.sort_array(functions.collect_list(new Column("Parent").as("parent_col"))),
        functions.sort_array(functions.collect_list(new Column("Child").as("child_col"))))
    .orderBy(new Column("Global_ID"))
    .withColumn("vars", functions.explode(<Spark UDF>)
    .select(new Column("vars"),new Column("parent_col"),new Column("child_col"))
    .javaRDD().zipWithIndex();


// Sample UDF (TODO: Actual Implementation)   
spark.udf().register("computeValue",
                (<Column Names>) -> <functionality & implementation>,
                DataTypes.<xxx>);

经过大量研究和博客中的许多建议,我尝试了以下方法,但无济于事。

技术栈:

Apache Spark (v2.1.1)

Java 8

AWS EMR 集群(Spark 应用部署)


数据量:

数据集中大约有 2000 万行

尝试的方法:

    Spark GraphX + GraphFrames:

    使用这种组合,我只能实现顶点和边之间的关系,但它不适合我的用例。 参考:https://graphframes.github.io/user-guide.html

    Spark GraphX Pregel API:

    这是我可以达到的最接近预期结果的方法,但不幸的是我找不到相同的 Java 代码 sn-p。 其中一篇博客中提供的示例是在 Scala 中,但我不是 精通。 参考:https://dzone.com/articles/processing-hierarchical-data-using-spark-graphx-pr

对当前方法中的替代(或)修改的任何建议都会非常有帮助,因为我完全迷失在为这个用例找出解决方案的过程中。

感谢您的帮助!谢谢!

【问题讨论】:

【参考方案1】:

注意:以下解决方案在 scala spark 中。您可以轻松翻译成 java 代码。

看看这个。我尝试使用 Spark Sql 进行操作,您可以了解一下。基本上想法是在聚合和分组它们时对子、父和 globalid 进行排序。一旦按 globalid 分组和排序,就展开其余的。您将获得有序的结果表,稍后您可以zipWithIndex 添加排名(值)

   import org.apache.spark.sql.SQLContext
   import org.apache.spark.sql.functions._
   import org.apache.spark.sql.expressions.UserDefinedFunction
   import org.apache.spark.sql.functions.udf

   val sqlContext = new SQLContext(sc)
   import sqlContext.implicits._

   val t = Seq((111,111,123), (111,111,111), (111,123,789), (111,268,321), (222,222,654), (222,222,222), (222,721,127), (333,333,398), (333,333,333), (333,333,598))
   val ddd = sc.parallelize(t).toDF
   val zip = udf((xs: Seq[Int], ys: Seq[Int]) => xs zip ys)
   val dd1 = ddd
    .groupBy($"_1")
    .agg(sort_array(collect_list($"_2")).as("v"),
         sort_array(collect_list($"_3")).as("w"))
    .orderBy(asc("_1"))
    .withColumn("vars", explode(zip($"v", $"w")))
    .select($"_1", $"vars._1", $"vars._2").rdd.zipWithIndex

  dd1.collect

输出

    res24: Array[(org.apache.spark.sql.Row, Long)] = Array(([111,111,111],0), ([111,111,123],1), ([111,123,321],2),
([111,268,789],3), ([222,222,127],4), ([222,222,222],5), ([222,721,654],6),([333,333,333],7), ([333,333,398],8), ([333,333,598],9))

【讨论】:

这似乎是一个非常可行的解决方案。谢谢你的代码!我肯定会尝试一下,但只是有点担心“收集为列表”,因为我的数据大约有 2000 万行,但它应该很好。如果您能提供相同 Scala 代码的 Java 等价物,那就太好了,因为我是 Scala 的新手。再次感谢! @Sridher 我认为您可以轻松地将其转换为 java 代码。这里要注意的主要是您可以在 java 中复制的 spark 代码。 我将大部分代码转换为其 java 等效代码,但在您使用的 Spark UDF 方面遇到了一些问题。你能帮我解决一下UDF吗?请参阅我编辑的帖子以获取代码 sn-p。 ***.com/questions/39735864/… 这可能会有所帮助 @Sridher 你能做到吗?

以上是关于Apache Spark 中的分层数据操作的主要内容,如果未能解决你的问题,请参考以下文章

Spark-TFRecord:Apache Spark与TensorFlow TFRecord互操作示例

apache spark中的sortbykey

Apache Spark大数据分析入门

Apache Spark:Mllib之决策树的操作(java)

在 Apache Spark SQL 中对多行进行操作

Elasticsearch:Apache spark 大数据集成