有效地将聚合列激发到 Set

Posted

技术标签:

【中文标题】有效地将聚合列激发到 Set【英文标题】:spark aggregating column into Set efficiently 【发布时间】:2017-03-19 15:56:17 【问题描述】:

如何在 Spark 中有效地将一列聚合为一个 Set(唯一元素数组)?

case class Foo(a:String, b:String, c:Int, d:Array[String])

  val df = Seq(Foo("A", "A", 123, Array("A")),
    Foo("A", "A", 123, Array("B")),
    Foo("B", "B", 123, Array("C", "A")),
    Foo("B", "B", 123, Array("C", "E", "A")),
    Foo("B", "B", 123, Array("D"))
  ).toDS()

会导致

+---+---+---+---------+
|  a|  b|  c|        d|
+---+---+---+---------+
|  A|  A|123|      [A]|
|  A|  A|123|      [B]|
|  B|  B|123|   [C, A]|
|  B|  B|123|[C, E, A]|
|  B|  B|123|      [D]|
+---+---+---+---------+

我正在寻找的是(d 列的顺序并不重要):

+---+---+---+------------+
|  a|  b|  c|        d  |
+---+---+---+------------+
|  A|  A|123|   [A, B].  |
|  B|  B|123|[C, A, E, D]|
+---+---+---+------------+

这可能有点类似于How to aggregate values into collection after groupBy? 或来自HighPerformanceSpark 的https://github.com/high-performance-spark/high-performance-spark-examples/blob/57a6267fb77fae5a90109bfd034ae9c18d2edf22/src/main/scala/com/high-performance-spark-examples/transformations/SmartAggregations.scala#L33-L43 的示例

使用以下代码:

import org.apache.spark.sql.functions.udf
val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten.distinct)
val d = flatten(collect_list($"d")).alias("d")
df.groupBy($"a", $"b", $"c").agg(d).show

会产生预期的结果,但我想知道是否有可能使用书中概述的 RDD API 来提高性能。并且想知道如何使用数据集 API 来制定它。

关于这个最小示例的执行细节如下:

== Optimized Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
   +- Aggregate [a#45, b#46, c#47], [a#45, b#46, c#47, UDF(collect_list(d#48, 0, 0)) AS d#82]
      +- LocalRelation [a#45, b#46, c#47, d#48]

== Physical Plan ==
CollectLimit 21
+- SortAggregate(key=[a#45, b#46, c#47], functions=[collect_list(d#48, 0, 0)], output=[a#45, b#46, c#47, d#82])
   +- *Sort [a#45 ASC NULLS FIRST, b#46 ASC NULLS FIRST, c#47 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(a#45, b#46, c#47, 200)
         +- LocalTableScan [a#45, b#46, c#47, d#48]

编辑

这个操作的问题概括的很好https://github.com/awesome-spark/spark-gotchas/blob/master/04_rdd_actions_and_transformations_by_example.md#be-smart-about-groupbykey

编辑2

正如您所见,下面建议的 dataSet 查询的 DAG 更复杂,而不是 0.4 似乎需要 2 秒。

【问题讨论】:

【参考方案1】:

试试这个

df.groupByKey(foo => (foo.a, foo.b, foo.c)).
  reduceGroups
     (foo1, foo2) => 
        foo1.copy(d = (foo1.d ++ foo2.d).distinct )
  .map(_._2)

【讨论】:

但是由于 reduceGroups 中使用的不可变 scala 集合,这不会创建很多不必要的对象吗? 请参阅上面的编辑,您的 SQL DAG 比 datafFame 方法复杂/慢得多。 这个解决方案试图避免洗牌重复的数据,如果在同一个键中没有太多的重复可能不值得这样做 这个解决方案应该比其他解决方案更快,因为它只涉及在 d 字段中包含大量重复记录的大数据集进行改组 我明白了。将在 2morrow 上尝试一些更大的数据。无论如何,您认为使用 reduceByKey 的 RDD 在这里会是一个不错的选择,即更快吗?

以上是关于有效地将聚合列激发到 Set的主要内容,如果未能解决你的问题,请参考以下文章

如何有效地将稀疏矩阵列添加到另一个稀疏矩阵中的每一列?

如何有效地将大型 .tsv 文件上传到 pyspark 中具有拆分列的 Hive 表?

将 SHAP 值聚合到特征集是不是有效?

pandas 有效地将 DataFrames 与不匹配的分类列和 MultiIndex 级别连接起来

如何有效地将大文件加载到 IndexedDB 存储中?我的应用程序在超过 100,000 行时崩溃

有效地将给定令牌与数据库中的散列,盐渍令牌进行比较