有效地将聚合列激发到 Set
Posted
技术标签:
【中文标题】有效地将聚合列激发到 Set【英文标题】:spark aggregating column into Set efficiently 【发布时间】:2017-03-19 15:56:17 【问题描述】:如何在 Spark 中有效地将一列聚合为一个 Set(唯一元素数组)?
case class Foo(a:String, b:String, c:Int, d:Array[String])
val df = Seq(Foo("A", "A", 123, Array("A")),
Foo("A", "A", 123, Array("B")),
Foo("B", "B", 123, Array("C", "A")),
Foo("B", "B", 123, Array("C", "E", "A")),
Foo("B", "B", 123, Array("D"))
).toDS()
会导致
+---+---+---+---------+
| a| b| c| d|
+---+---+---+---------+
| A| A|123| [A]|
| A| A|123| [B]|
| B| B|123| [C, A]|
| B| B|123|[C, E, A]|
| B| B|123| [D]|
+---+---+---+---------+
我正在寻找的是(d 列的顺序并不重要):
+---+---+---+------------+
| a| b| c| d |
+---+---+---+------------+
| A| A|123| [A, B]. |
| B| B|123|[C, A, E, D]|
+---+---+---+------------+
这可能有点类似于How to aggregate values into collection after groupBy? 或来自HighPerformanceSpark
的https://github.com/high-performance-spark/high-performance-spark-examples/blob/57a6267fb77fae5a90109bfd034ae9c18d2edf22/src/main/scala/com/high-performance-spark-examples/transformations/SmartAggregations.scala#L33-L43 的示例
使用以下代码:
import org.apache.spark.sql.functions.udf
val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten.distinct)
val d = flatten(collect_list($"d")).alias("d")
df.groupBy($"a", $"b", $"c").agg(d).show
会产生预期的结果,但我想知道是否有可能使用书中概述的 RDD API 来提高性能。并且想知道如何使用数据集 API 来制定它。
关于这个最小示例的执行细节如下:
== Optimized Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
+- Aggregate [a#45, b#46, c#47], [a#45, b#46, c#47, UDF(collect_list(d#48, 0, 0)) AS d#82]
+- LocalRelation [a#45, b#46, c#47, d#48]
== Physical Plan ==
CollectLimit 21
+- SortAggregate(key=[a#45, b#46, c#47], functions=[collect_list(d#48, 0, 0)], output=[a#45, b#46, c#47, d#82])
+- *Sort [a#45 ASC NULLS FIRST, b#46 ASC NULLS FIRST, c#47 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(a#45, b#46, c#47, 200)
+- LocalTableScan [a#45, b#46, c#47, d#48]
编辑
这个操作的问题概括的很好https://github.com/awesome-spark/spark-gotchas/blob/master/04_rdd_actions_and_transformations_by_example.md#be-smart-about-groupbykey
编辑2
正如您所见,下面建议的 dataSet 查询的 DAG 更复杂,而不是 0.4 似乎需要 2 秒。
【问题讨论】:
【参考方案1】:试试这个
df.groupByKey(foo => (foo.a, foo.b, foo.c)).
reduceGroups
(foo1, foo2) =>
foo1.copy(d = (foo1.d ++ foo2.d).distinct )
.map(_._2)
【讨论】:
但是由于 reduceGroups 中使用的不可变 scala 集合,这不会创建很多不必要的对象吗? 请参阅上面的编辑,您的 SQL DAG 比 datafFame 方法复杂/慢得多。 这个解决方案试图避免洗牌重复的数据,如果在同一个键中没有太多的重复可能不值得这样做 这个解决方案应该比其他解决方案更快,因为它只涉及在 d 字段中包含大量重复记录的大数据集进行改组 我明白了。将在 2morrow 上尝试一些更大的数据。无论如何,您认为使用 reduceByKey 的 RDD 在这里会是一个不错的选择,即更快吗?以上是关于有效地将聚合列激发到 Set的主要内容,如果未能解决你的问题,请参考以下文章
如何有效地将大型 .tsv 文件上传到 pyspark 中具有拆分列的 Hive 表?
pandas 有效地将 DataFrames 与不匹配的分类列和 MultiIndex 级别连接起来