pyspark:groupby 和聚合 avg 和 first 在多个列上

Posted

技术标签:

【中文标题】pyspark:groupby 和聚合 avg 和 first 在多个列上【英文标题】:pyspark: groupby and aggregate avg and first on multiple columns 【发布时间】:2020-06-28 09:09:57 【问题描述】:

我有以下示例 pyspark 数据框,在 groupby 之后我想计算平均值,以及多列中的第一列,实际上我有 100 列,所以我不能单独计算

sp = spark.createDataFrame([['a',2,4,'cc','anc'], ['a',4,7,'cd','abc'], ['b',6,0,'as','asd'], ['b', 2, 4, 'ad','acb'],
                        ['c', 4, 4, 'sd','acc']], ['id', 'col1', 'col2','col3', 'col4'])

+---+----+----+----+----+
| id|col1|col2|col3|col4|
+---+----+----+----+----+
|  a|   2|   4|  cc| anc|
|  a|   4|   7|  cd| abc|
|  b|   6|   0|  as| asd|
|  b|   2|   4|  ad| acb|
|  c|   4|   4|  sd| acc|
+---+----+----+----+----+

这就是我正在尝试的

mean_cols = ['col1', 'col2']
first_cols = ['col3', 'col4']
sc.groupby('id').agg(*[ f.mean for col in mean_cols], *[f.first for col in first_cols])

但它不起作用。我怎么能用 pyspark 做到这一点

【问题讨论】:

你没有在聚合函数中调用列名,你可以尝试f.mean(col)而不是f.mean,它可以工作sp.groupBy('id').agg(*[f.mean(col) for col in mean_cols], *[f.first(col) for col in first_cols]).show() 【参考方案1】:

多个列上的多个函数的最佳方式是使用 .agg(*expr) 格式。

import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import numpy as np
#Test data
tst = sqlContext.createDataFrame([(1,2,3,4),(3,4,5,1),(5,6,7,8),(7,8,9,2)],schema=['col1','col2','col3','col4'])
fn_l = [F.min,F.max,F.mean,F.first]
col_l=['col1','col2','col3']
expr = [fn(coln).alias(str(fn.__name__)+'_'+str(coln)) for fn in fn_l for coln in col_l]
tst_r = tst.groupby('col4').agg(*expr)

结果是

tst_r.show()
+----+--------+--------+--------+--------+--------+--------+---------+---------+---------+----------+----------+----------+
|col4|min_col1|min_col2|min_col3|max_col1|max_col2|max_col3|mean_col1|mean_col2|mean_col3|first_col1|first_col2|first_col3|
+----+--------+--------+--------+--------+--------+--------+---------+---------+---------+----------+----------+----------+
|   5|       5|       6|       7|       7|       8|       9|      6.0|      7.0|      8.0|         5|         6|         7|
|   4|       1|       2|       3|       3|       4|       5|      2.0|      3.0|      4.0|         1|         2|         3|
+----+--------+--------+--------+--------+--------+--------+---------+---------+---------+----------+----------+----------+

为了有选择地在列上应用函数,您可以拥有多个表达式数组并将它们连接到聚合中。

fn_l = [F.min,F.max]
fn_2=[F.mean,F.first]
col_l=['col1','col2']
col_2=['col1','col3','col4']
expr1 = [fn(coln).alias(str(fn.__name__)+'_'+str(coln)) for fn in fn_l for coln in col_l]
expr2 = [fn(coln).alias(str(fn.__name__)+'_'+str(coln)) for fn in fn_2 for coln in col_2]
tst_r = tst.groupby('col4').agg(*(expr1+expr2))

【讨论】:

在您的解决方案中,您将在所有列上应用所有函数,但我特别希望在少数列上使用很少的函数,我该如何使用 expr 来做到这一点 @ManuSharma - 已更新答案,检查它是否有效。您可以定义多个数组并在 agg 表达式中连接它们

以上是关于pyspark:groupby 和聚合 avg 和 first 在多个列上的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 groupby 和聚合将 pyspark 数据框中的行与多列连接起来

如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?

多列上的多个聚合

具有多个聚合的 pyspark groupBy(如 pandas)

PySpark中pandas_udf的隐式模式?

在 pyspark 中,是不是可以使用 1 个 groupBy 进行 2 个聚合?