具有多个聚合的 pyspark groupBy(如 pandas)

Posted

技术标签:

【中文标题】具有多个聚合的 pyspark groupBy(如 pandas)【英文标题】:pyspark groupBy with multiple aggregates (like pandas) 【发布时间】:2018-04-04 22:26:11 【问题描述】:

我对 pyspark 很陌生,我正在尝试将我的 pandas 代码转换为 pyspark。我遇到的一件事是聚合我的 groupby。

这里是熊猫代码:

df_trx_m = train1.groupby('CUSTOMER_NUMBER')['trx'].agg(['mean', 'var'])

我在 AnalyticsVidhya 上看到了这个示例,但我不确定如何将其应用到上面的代码中:

train.groupby('Age').agg('Purchase': 'mean').show()
Output:
+-----+-----------------+
|  Age|    avg(Purchase)|
+-----+-----------------+
|51-55|9534.808030960236|
|46-50|9208.625697468327|
| 0-17|8933.464640444974|
|36-45|9331.350694917874|
|26-35|9252.690632869888|
|  55+|9336.280459449405|
|18-25|9169.663606261289|
+-----+-----------------+

任何帮助将不胜感激

编辑:

这是另一个尝试:

from pyspark.sql.functions import avg, variance
train1.groupby("CUSTOMER_NUMBER")\
    .agg(
        avg('repatha_trx').alias("repatha_trx_avg"), 
        variance('repatha_trx').alias("repatha_trx_Var")
    )\
    .show(100)

但这只是给了我一个空数据框。

【问题讨论】:

您的第二次尝试看起来应该可以了。您能否提供重现此问题的minimal reproducible example?请提供一个小样本数据框。阅读更多关于how to make good reproducible apache spark dataframe examples。 【参考方案1】:

您可以导入pyspark functions进行聚合。

# load function
from pyspark.sql import functions as F

# aggregate data
df_trx_m = train.groupby('Age').agg(
    F.avg(F.col('repatha_trx')).alias('repatha_trx_avg'),
    F.variance(F.col('repatha_trx')).alias('repatha_trx_var')
)

请注意,pyspark.sql.functions.variance() 返回总体方差。无偏样本方差还有另一个函数pyspark.sql.functions.var_samp()

【讨论】:

以上是关于具有多个聚合的 pyspark groupBy(如 pandas)的主要内容,如果未能解决你的问题,请参考以下文章

pyspark:groupby 和聚合 avg 和 first 在多个列上

在 pyspark 中,是不是可以使用 1 个 groupBy 进行 2 个聚合?

在 groupby 操作 PySpark 中聚合列中的稀疏向量

与 groupBy 聚合后将 pyspark 数据帧保存为 csv 文件

如何使用 groupby 和聚合将 pyspark 数据框中的行与多列连接起来

如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?