Spark SQL 中分组依据和窗口函数如何交互?
Posted
技术标签:
【中文标题】Spark SQL 中分组依据和窗口函数如何交互?【英文标题】:How do group by and window functions interact in Spark SQL? 【发布时间】:2020-04-13 19:43:28 【问题描述】:从this question,我了解到窗口函数是在 PostgresSQL 中按函数分组之后计算的。
我想知道当您在 Spark 的同一查询中使用 group by 和 window 函数时会发生什么。我和上一个问题的发帖人有同样的问题:
选择的行是先分组,再由窗口函数考虑? 还是先执行窗口函数,然后按group by对结果值进行分组? 还有别的吗?【问题讨论】:
【参考方案1】:如果您在同一查询中有 window 和 group by,那么
执行分组first
,然后window
函数将应用于分组数据集。
您可以查看查询解释计划以获取更多详细信息。
Example:
//sample data
spark.sql("select * from tmp").show()
//+-------+----+
//|trip_id|name|
//+-------+----+
//| 1| a|
//| 2| b|
//+-------+----+
spark.sql("select row_number() over(order by trip_id),trip_id,count(*) cnt from tmp group by trip_id").explain()
//== Physical Plan ==
//*(4) Project [row_number() OVER (ORDER BY trip_id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#150, trip_id#10, cnt#140L]
//+- Window [row_number() windowspecdefinition(trip_id#10 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number() OVER (ORDER BY //trip_id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#150], [trip_id#10 ASC NULLS FIRST]
// +- *(3) Sort [trip_id#10 ASC NULLS FIRST], false, 0
// +- Exchange SinglePartition
// +- *(2) HashAggregate(keys=[trip_id#10], functions=[count(1)])
// +- Exchange hashpartitioning(trip_id#10, 200)
// +- *(1) HashAggregate(keys=[trip_id#10], functions=[partial_count(1)])
// +- LocalTableScan [trip_id#10]
*(2) groupby executed first
*(4) window function applied on the result of grouped dataset.
如果您有窗口子句 subquery
并且外部查询有分组,则首先执行子查询(窗口),然后执行外部查询(分组)。
Ex:
spark.sql("select trip_id,count(*) from(select *,row_number() over(order by trip_id)rn from tmp)e group by trip_id ").explain()
【讨论】:
以上是关于Spark SQL 中分组依据和窗口函数如何交互?的主要内容,如果未能解决你的问题,请参考以下文章