Spark SQL 中分组依据和窗口函数如何交互?

Posted

技术标签:

【中文标题】Spark SQL 中分组依据和窗口函数如何交互?【英文标题】:How do group by and window functions interact in Spark SQL? 【发布时间】:2020-04-13 19:43:28 【问题描述】:

从this question,我了解到窗口函数是在 PostgresSQL 中按函数分组之后计算的。

我想知道当您在 Spark 的同一查询中使用 group by 和 window 函数时会发生什么。我和上一个问题的发帖人有同样的问题:

选择的行是先分组,再由窗口函数考虑? 还是先执行窗口函数,然后按group by对结果值进行分组? 还有别的吗?

【问题讨论】:

【参考方案1】:

如果您在同一查询中有 windowgroup by,那么

执行分组first,然后window函数将应用于分组数据集。

您可以查看查询解释计划以获取更多详细信息。

Example:

//sample data
spark.sql("select * from tmp").show()
//+-------+----+
//|trip_id|name|
//+-------+----+
//|      1|   a|
//|      2|   b|
//+-------+----+


spark.sql("select row_number() over(order by trip_id),trip_id,count(*) cnt from tmp group by trip_id").explain()
//== Physical Plan ==
//*(4) Project [row_number() OVER (ORDER BY trip_id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#150, trip_id#10, cnt#140L]
//+- Window [row_number() windowspecdefinition(trip_id#10 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number() OVER (ORDER BY //trip_id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#150], [trip_id#10 ASC NULLS FIRST]
//   +- *(3) Sort [trip_id#10 ASC NULLS FIRST], false, 0
//      +- Exchange SinglePartition
//         +- *(2) HashAggregate(keys=[trip_id#10], functions=[count(1)])
//            +- Exchange hashpartitioning(trip_id#10, 200)
//               +- *(1) HashAggregate(keys=[trip_id#10], functions=[partial_count(1)])
//                  +- LocalTableScan [trip_id#10]

*(2) groupby executed first

*(4) window function applied on the result of grouped dataset.

如果您有窗口子句 subquery 并且外部查询有分组,则首先执行子查询(窗口),然后执行外部查询(分组)。

Ex: spark.sql("select trip_id,count(*) from(select *,row_number() over(order by trip_id)rn from tmp)e group by trip_id ").explain()

【讨论】:

以上是关于Spark SQL 中分组依据和窗口函数如何交互?的主要内容,如果未能解决你的问题,请参考以下文章

R中的累积和、移动平均线和SQL“分组依据”等价物

SQL Server中的开窗函数是啥?

Windows 函数和分组依据

Flink SQL 分组窗口函数 Group Window 实战

如何在烧瓶SQL炼金术中使用分组依据

当按使用的时间窗口分组时,Spark 如何确定第一个窗口的 window.start?