Spark Window 聚合与 Group By/Join 性能
Posted
技术标签:
【中文标题】Spark Window 聚合与 Group By/Join 性能【英文标题】:Spark Window aggregation vs. Group By/Join performance 【发布时间】:2020-06-17 13:44:48 【问题描述】:与 group by/join 相比,我对在窗口上运行聚合函数的性能特征感兴趣。在这种情况下,我对具有自定义框架边界或排序的窗口函数不感兴趣,而只是作为运行聚合函数的一种方式。
请注意,我只对大量数据的批处理(非流式传输)性能感兴趣,因此我禁用了以下广播连接。
例如,假设我们从以下 DataFrame 开始:
val df = Seq(("bob", 10), ("sally", 32), ("mike", 9), ("bob", 18)).toDF("name", "age")
df.show(false)
+-----+---+
|name |age|
+-----+---+
|bob |10 |
|sally|32 |
|mike |9 |
|bob |18 |
+-----+---+
假设我们要计算每个名称出现的次数,然后在具有匹配名称的行上提供该计数。
分组/加入
val joinResult = df.join(
df.groupBy($"name").count,
Seq("name"),
"inner"
)
joinResult.show(false)
+-----+---+-----+
|name |age|count|
+-----+---+-----+
|sally|32 |1 |
|mike |9 |1 |
|bob |18 |2 |
|bob |10 |2 |
+-----+---+-----+
joinResult.explain
== Physical Plan ==
*(4) Project [name#5, age#6, count#12L]
+- *(4) SortMergeJoin [name#5], [name#15], Inner
:- *(1) Sort [name#5 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#5, 200)
: +- LocalTableScan [name#5, age#6]
+- *(3) Sort [name#15 ASC NULLS FIRST], false, 0
+- *(3) HashAggregate(keys=[name#15], functions=[count(1)])
+- Exchange hashpartitioning(name#15, 200)
+- *(2) HashAggregate(keys=[name#15], functions=[partial_count(1)])
+- LocalTableScan [name#15]
窗口
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions => f
val windowResult = df.withColumn("count", f.count($"*").over(Window.partitionBy($"name")))
windowResult.show(false)
+-----+---+-----+
|name |age|count|
+-----+---+-----+
|sally|32 |1 |
|mike |9 |1 |
|bob |10 |2 |
|bob |18 |2 |
+-----+---+-----+
windowResult.explain
== Physical Plan ==
Window [count(1) windowspecdefinition(name#5, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS count#34L], [name#5]
+- *(1) Sort [name#5 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#5, 200)
+- LocalTableScan [name#5, age#6]
根据执行计划,窗口化似乎更有效(阶段更少)。所以我的问题是,是否总是这样——我应该总是使用窗口函数来进行这种聚合吗?随着数据的增长,这两种方法是否会以类似的方式扩展?如果出现极端偏差(即某些名称比其他名称更常见)怎么办?
【问题讨论】:
【参考方案1】:这取决于数据。更具体地说,它取决于name
列的基数。如果基数小,聚合后数据小,聚合结果可以在join中广播。在这种情况下,连接将比window
更快。反之,如果基数大,聚合后数据大,那么join会用SortMergeJoin
来规划,使用window
效率会更高。
在window
的情况下,我们总共有 1 次随机播放 + 一种排序。在SortMergeJoin
的情况下,我们在左分支中具有相同的(总洗牌+排序)加上在右分支中额外减少的洗牌和排序(通过减少我的意思是首先聚合数据)。在连接的右分支中,我们还对数据进行了额外的扫描。
另外,您可以查看我在 Spark 峰会上的 video,我在其中分析了类似示例。
【讨论】:
谢谢——我真的只对聚合输出对于广播连接来说太大的情况感兴趣。【参考方案2】:按照您的说明禁用广播并使用定时方法为随机生成的 1M 和 2M 名称生成一些数据,也就是体面的大小,计划 2 的执行时间似乎确实更好。 databricks 集群(社区)上有 8、8、200 个分区大小。
生成的计划具有通过窗口进行排序和计数的智能,正如您所说的更少阶段。这似乎是关键。在规模上,您可以拥有更多分区,但证据使我倾向于接近 2。
我尝试了随机的姓名样本(忽略了年龄)并得到了这个:
加入 48.361 秒 vs 22.028 秒的 1M 记录 for.count 的窗口
集群重启后 .count 的 2M 记录的窗口加入时间为 85.814 秒与 50.566 秒
加入 96.295 秒 vs 43.875 秒的 .count 2M 记录窗口
使用的代码:
import scala.collection.mutable.ListBuffer
import scala.util.Random
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions => f
val alpha = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
val size = alpha.size
def randStr(n:Int) = (1 to n).map(_ => alpha(Random.nextInt(size))).mkString
def timeIt[T](op: => T): Float =
val start = System.currentTimeMillis
val res = op
val end = System.currentTimeMillis
(end - start) / 1000f
var names = new ListBuffer[String]()
for (i <- 1 to 2000000 )
names += randStr(10)
val namesList = names.toSeq
val df = namesList.toDF("name")
val joinResult = df.join(df.groupBy($"name").count, Seq("name"), "inner")
val windowResult = df.withColumn("count", f.count($"*").over(Window.partitionBy($"name")))
val time1 = timeIt(joinResult.count)
val time2 = timeIt(windowResult.count)
println(s"join in $time1 seconds vs $time2 seconds for window")
此外,这个问题还证明了 Spark Optimizer 的不成熟。
【讨论】:
以上是关于Spark Window 聚合与 Group By/Join 性能的主要内容,如果未能解决你的问题,请参考以下文章
Spark Scala数据框具有单个Group By的多个聚合[重复]
org.apache.spark.sql.AnalysisException:表达式 't2.`sum_click_passed`' 既不在 group by 中,也不是聚合函数
Spark SQL 可以在 GROUP BY 聚合中使用 FIRST_VALUE 和 LAST_VALUE(但这不是标准的)