使用 Group By 结束 Spark 窗口的问题

Posted

技术标签:

【中文标题】使用 Group By 结束 Spark 窗口的问题【英文标题】:Issue with Spark Window over with Group By 【发布时间】:2020-05-06 02:26:10 【问题描述】:

我想在窗口上填充 agg,该窗口的粒度与 select group by 不同。 使用 Scala sql。

Select c1,c2,c3,max(c4),max(c5),
Max(c4) over (partition by c1,c2,c3),
Avg(c5) over (partition by c1,c2,c3)
From temp_view 
Group by c1,c2,c3

得到错误说:

c4 and c5 not being part of Group by or use first().

【问题讨论】:

在使用partition by时不需要使用Group by。保持为Select c1,c2,c3,max(c4),max(c5), Max(c4) over (按 c1,c2,c3 分区,Avg(c5) over(按 c1,c2,c3 分区)来自 temp_view 谢谢@Nikk ...试图了解这背后的原因不允许分组...你能解释一下吗? GroupBy 和 PartitionBy 在几个方面有相同的目的。如果您使用 GroupBy,则所有聚合仅在这些 GroupBy 列上工作。当您使用分区依据时也会发生同样的事情。两者之间唯一的主要区别是 groupBy 减少了编号。记录数和在 select 中我们只需要使用 group by 但在 ParitionBy 中使用的列,记录数不会减少。取而代之的是,它将添加一个额外的列,并且在选择中我们可以使用 N no。的列。没有限制。 谢谢 Nikk.. 你能回答这个问题而不是评论吗? 金,请检查我的回答 【参考方案1】:

正如我在评论中所说,GroupByPartitionBy 在某些方面具有相同的目的。如果您使用GroupBy,则所有聚合仅对这些GroupBy 列起作用。当您使用partition by 时,也会发生同样的事情。两者之间唯一的主要区别是groupBy 减少了编号。记录数和在选择中,我们只需要使用分组中使用的列,但在ParitionBy 中,记录数不会减少。取而代之的是,它将添加一个额外的聚合列,并且在选择中我们可以使用 N no。的列。

对于您的问题,您在 Group By 中使用列 c1、c2、c3 并将 Max(c4)、AVG(c5) 与 partition by 一起使用,因此它会给您错误。 对于您的用例,您可以使用以下任一查询:

Select c1,c2,c3,max(c4),max(c5)
From temp_view 
Group by c1,c2,c3

Select c1,c2,c3,
Max(c4) over (partition by c1,c2,c3),
Avg(c5) over (partition by c1,c2,c3)
From temp_view

下面是一个例子,它会给你一个清晰的画面,

scala> spark.sql("""SELECT * from table""").show()
+---+----------------+-------+------+
| ID|            NAME|COMPANY|SALARY|
+---+----------------+-------+------+
|  1|    Gannon Chang|    ABC|440993|
|  2|   Hashim Morris|    XYZ| 49140|
|  3|       Samson Le|    ABC|413890|
|  4|   Brandon Doyle|    XYZ|384118|
|  5|    Jacob Coffey|    BCD|504819|
|  6|   Dillon Holder|    ABC|734086|
|  7|Salvador Vazquez|    NGO|895082|
|  8|    Paki Simpson|    BCD|305046|
|  9|   Laith Stewart|    ABC|943750|
| 10|  Simon Whitaker|    NGO|561896|
| 11|   Denton Torres|    BCD| 10442|
| 12|Garrison Sellers|    ABC| 53024|
| 13| Theodore Bolton|    TTT|881521|
| 14|   Kamal Roberts|    TTT|817422|
+---+----------------+-------+------+

//You can only use column to select that is in group by
scala> spark.sql("""SELECT COMPANY, max(SALARY) from table group by COMPANY""").show()
+-------+-----------+
|COMPANY|max(SALARY)|
+-------+-----------+
|    NGO|     895082|
|    BCD|     504819|
|    XYZ|     384118|
|    TTT|     881521|
|    ABC|     943750|
+-------+-----------+

//It will give error if you select all column or column other than Group By

scala> spark.sql("""SELECT *, max(SALARY) from table group by COMPANY""").show()
org.apache.spark.sql.AnalysisException: expression 'table.`ID`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Aggregate [COMPANY#94], [ID#92, NAME#93, COMPANY#94, SALARY#95L, max(SALARY#95L) AS max(SALARY)#213L]
+- SubqueryAlias table
   +- Relation[ID#92,NAME#93,COMPANY#94,SALARY#95L] parquet

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:92)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:187)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$9.apply(CheckAnalysis.scala:220)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$9.apply(CheckAnalysis.scala:220)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:220)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
  ... 49 elided


//But you can select all columns with partition by
scala> spark.sql("""SELECT *, Max(SALARY) over (PARTITION BY COMPANY) as Max_Salary from table""").show()
+---+----------------+-------+------+----------+
| ID|            NAME|COMPANY|SALARY|Max_Salary|
+---+----------------+-------+------+----------+
|  7|Salvador Vazquez|    NGO|895082|    895082|
| 10|  Simon Whitaker|    NGO|561896|    895082|
|  5|    Jacob Coffey|    BCD|504819|    504819|
|  8|    Paki Simpson|    BCD|305046|    504819|
| 11|   Denton Torres|    BCD| 10442|    504819|
|  2|   Hashim Morris|    XYZ| 49140|    384118|
|  4|   Brandon Doyle|    XYZ|384118|    384118|
| 13| Theodore Bolton|    TTT|881521|    881521|
| 14|   Kamal Roberts|    TTT|817422|    881521|
|  1|    Gannon Chang|    ABC|440993|    943750|
|  3|       Samson Le|    ABC|413890|    943750|
|  6|   Dillon Holder|    ABC|734086|    943750|
|  9|   Laith Stewart|    ABC|943750|    943750|
| 12|Garrison Sellers|    ABC| 53024|    943750|
+---+----------------+-------+------+----------+

【讨论】:

以上是关于使用 Group By 结束 Spark 窗口的问题的主要内容,如果未能解决你的问题,请参考以下文章

Spark Window 聚合与 Group By/Join 性能

如何在 spark 数据集上使用 group by

如何使用group by聚合spark中的结构数组

Spark SQL 中 Group By 子句的底层实现

Spark CTAS 上的 Hive 使用 Straight SELECT 失败,但使用 SELECT GROUP BY 成功

spark Group By数据框列没有聚合[重复]