使用 Group By 结束 Spark 窗口的问题
Posted
技术标签:
【中文标题】使用 Group By 结束 Spark 窗口的问题【英文标题】:Issue with Spark Window over with Group By 【发布时间】:2020-05-06 02:26:10 【问题描述】:我想在窗口上填充 agg,该窗口的粒度与 select group by 不同。 使用 Scala sql。
Select c1,c2,c3,max(c4),max(c5),
Max(c4) over (partition by c1,c2,c3),
Avg(c5) over (partition by c1,c2,c3)
From temp_view
Group by c1,c2,c3
得到错误说:
c4 and c5 not being part of Group by or use first().
【问题讨论】:
在使用partition by时不需要使用Group by。保持为Select c1,c2,c3,max(c4),max(c5), Max(c4) over (按 c1,c2,c3 分区,Avg(c5) over(按 c1,c2,c3 分区)来自 temp_view 谢谢@Nikk ...试图了解这背后的原因不允许分组...你能解释一下吗? GroupBy 和 PartitionBy 在几个方面有相同的目的。如果您使用 GroupBy,则所有聚合仅在这些 GroupBy 列上工作。当您使用分区依据时也会发生同样的事情。两者之间唯一的主要区别是 groupBy 减少了编号。记录数和在 select 中我们只需要使用 group by 但在 ParitionBy 中使用的列,记录数不会减少。取而代之的是,它将添加一个额外的列,并且在选择中我们可以使用 N no。的列。没有限制。 谢谢 Nikk.. 你能回答这个问题而不是评论吗? 金,请检查我的回答 【参考方案1】:正如我在评论中所说,GroupBy
和 PartitionBy
在某些方面具有相同的目的。如果您使用GroupBy
,则所有聚合仅对这些GroupBy
列起作用。当您使用partition by
时,也会发生同样的事情。两者之间唯一的主要区别是groupBy
减少了编号。记录数和在选择中,我们只需要使用分组中使用的列,但在ParitionBy
中,记录数不会减少。取而代之的是,它将添加一个额外的聚合列,并且在选择中我们可以使用 N no。的列。
对于您的问题,您在 Group By 中使用列 c1、c2、c3 并将 Max(c4)、AVG(c5) 与 partition by 一起使用,因此它会给您错误。 对于您的用例,您可以使用以下任一查询:
Select c1,c2,c3,max(c4),max(c5)
From temp_view
Group by c1,c2,c3
或
Select c1,c2,c3,
Max(c4) over (partition by c1,c2,c3),
Avg(c5) over (partition by c1,c2,c3)
From temp_view
下面是一个例子,它会给你一个清晰的画面,
scala> spark.sql("""SELECT * from table""").show()
+---+----------------+-------+------+
| ID| NAME|COMPANY|SALARY|
+---+----------------+-------+------+
| 1| Gannon Chang| ABC|440993|
| 2| Hashim Morris| XYZ| 49140|
| 3| Samson Le| ABC|413890|
| 4| Brandon Doyle| XYZ|384118|
| 5| Jacob Coffey| BCD|504819|
| 6| Dillon Holder| ABC|734086|
| 7|Salvador Vazquez| NGO|895082|
| 8| Paki Simpson| BCD|305046|
| 9| Laith Stewart| ABC|943750|
| 10| Simon Whitaker| NGO|561896|
| 11| Denton Torres| BCD| 10442|
| 12|Garrison Sellers| ABC| 53024|
| 13| Theodore Bolton| TTT|881521|
| 14| Kamal Roberts| TTT|817422|
+---+----------------+-------+------+
//You can only use column to select that is in group by
scala> spark.sql("""SELECT COMPANY, max(SALARY) from table group by COMPANY""").show()
+-------+-----------+
|COMPANY|max(SALARY)|
+-------+-----------+
| NGO| 895082|
| BCD| 504819|
| XYZ| 384118|
| TTT| 881521|
| ABC| 943750|
+-------+-----------+
//It will give error if you select all column or column other than Group By
scala> spark.sql("""SELECT *, max(SALARY) from table group by COMPANY""").show()
org.apache.spark.sql.AnalysisException: expression 'table.`ID`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Aggregate [COMPANY#94], [ID#92, NAME#93, COMPANY#94, SALARY#95L, max(SALARY#95L) AS max(SALARY)#213L]
+- SubqueryAlias table
+- Relation[ID#92,NAME#93,COMPANY#94,SALARY#95L] parquet
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:92)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:187)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$9.apply(CheckAnalysis.scala:220)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$9.apply(CheckAnalysis.scala:220)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:220)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
... 49 elided
//But you can select all columns with partition by
scala> spark.sql("""SELECT *, Max(SALARY) over (PARTITION BY COMPANY) as Max_Salary from table""").show()
+---+----------------+-------+------+----------+
| ID| NAME|COMPANY|SALARY|Max_Salary|
+---+----------------+-------+------+----------+
| 7|Salvador Vazquez| NGO|895082| 895082|
| 10| Simon Whitaker| NGO|561896| 895082|
| 5| Jacob Coffey| BCD|504819| 504819|
| 8| Paki Simpson| BCD|305046| 504819|
| 11| Denton Torres| BCD| 10442| 504819|
| 2| Hashim Morris| XYZ| 49140| 384118|
| 4| Brandon Doyle| XYZ|384118| 384118|
| 13| Theodore Bolton| TTT|881521| 881521|
| 14| Kamal Roberts| TTT|817422| 881521|
| 1| Gannon Chang| ABC|440993| 943750|
| 3| Samson Le| ABC|413890| 943750|
| 6| Dillon Holder| ABC|734086| 943750|
| 9| Laith Stewart| ABC|943750| 943750|
| 12|Garrison Sellers| ABC| 53024| 943750|
+---+----------------+-------+------+----------+
【讨论】:
以上是关于使用 Group By 结束 Spark 窗口的问题的主要内容,如果未能解决你的问题,请参考以下文章
Spark Window 聚合与 Group By/Join 性能
Spark CTAS 上的 Hive 使用 Straight SELECT 失败,但使用 SELECT GROUP BY 成功