在 PySpark Dataframe 中结合旋转和分组聚合

Posted

技术标签:

【中文标题】在 PySpark Dataframe 中结合旋转和分组聚合【英文标题】:Combine pivoting and groupby aggregating in PySpark Dataframe 【发布时间】:2020-02-06 11:54:02 【问题描述】:

我正在重复使用Combine pivoted and aggregated column in PySpark Dataframe中的示例

假设我有一个 Spark 数据框

 date      | recipe | percent | volume
----------------------------------------
2019-01-01 |   A    |  0.03   |  53
2019-01-01 |   A    |  0.02   |  55
2019-01-01 |   B    |  0.05   |  60
2019-01-02 |   A    |  0.11   |  75
2019-01-02 |   B    |  0.06   |  64
2019-01-02 |   B    |  0.08   |  66

我如何以一列为中心并在另一列上聚合,如下面的伪代码:

df.groupBy('date').max('volume').alias('max_volume').pivot('recipe').agg(avg('percent').alias('percent')).show()

 date      | A_percent | B_percent | max_volume
--------------------------------------------------------
2019-01-01 |   0.025   |  0.05     |  60
2019-01-02 |   0.11    |  0.07     |  75

我想一步实现这一点,而不必稍后对列 A_volume 和 B_volume 执行 max 以避免命名这些列。

ps。 当我本机运行该伪代码时,我得到了

AttributeError: 'DataFrame' object has no attribute 'pivot'

【问题讨论】:

【参考方案1】:

试试这个:

from pyspark.sql.functions import *
from pyspark.sql import Window
var win = Window.partitionBy("date") 
data.withColumn("max_vol",max("volume").over(win)).groupBy("date","max_vol") .pivot("recipe") .agg(avg("percent")).show()

【讨论】:

以上是关于在 PySpark Dataframe 中结合旋转和分组聚合的主要内容,如果未能解决你的问题,请参考以下文章

在 Apache Spark 中拆分 DataFrame

如何在保留现有架构的同时从行中创建 DataFrame?

我应该在 PySpark 中选择 RDD 还是 DataFrame 之一?

如何为现有 DataFrame 创建新行?在 PySpark 或 Scala 中

在 Pyspark 中合并 DataFrame

在 PySpark DataFrame 中添加多个空列