pyspark:groupby 和聚合 avg 和 first 在多个列上
Posted
技术标签:
【中文标题】pyspark:groupby 和聚合 avg 和 first 在多个列上【英文标题】:pyspark: groupby and aggregate avg and first on multiple columns 【发布时间】:2020-06-28 09:09:57 【问题描述】:我有以下示例 pyspark 数据框,在 groupby 之后我想计算平均值,以及多列中的第一列,实际上我有 100 列,所以我不能单独计算
sp = spark.createDataFrame([['a',2,4,'cc','anc'], ['a',4,7,'cd','abc'], ['b',6,0,'as','asd'], ['b', 2, 4, 'ad','acb'],
['c', 4, 4, 'sd','acc']], ['id', 'col1', 'col2','col3', 'col4'])
+---+----+----+----+----+
| id|col1|col2|col3|col4|
+---+----+----+----+----+
| a| 2| 4| cc| anc|
| a| 4| 7| cd| abc|
| b| 6| 0| as| asd|
| b| 2| 4| ad| acb|
| c| 4| 4| sd| acc|
+---+----+----+----+----+
这就是我正在尝试的
mean_cols = ['col1', 'col2']
first_cols = ['col3', 'col4']
sc.groupby('id').agg(*[ f.mean for col in mean_cols], *[f.first for col in first_cols])
但它不起作用。我怎么能用 pyspark 做到这一点
【问题讨论】:
你没有在聚合函数中调用列名,你可以尝试f.mean(col)
而不是f.mean
,它可以工作sp.groupBy('id').agg(*[f.mean(col) for col in mean_cols], *[f.first(col) for col in first_cols]).show()
【参考方案1】:
多个列上的多个函数的最佳方式是使用 .agg(*expr) 格式。
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import numpy as np
#Test data
tst = sqlContext.createDataFrame([(1,2,3,4),(3,4,5,1),(5,6,7,8),(7,8,9,2)],schema=['col1','col2','col3','col4'])
fn_l = [F.min,F.max,F.mean,F.first]
col_l=['col1','col2','col3']
expr = [fn(coln).alias(str(fn.__name__)+'_'+str(coln)) for fn in fn_l for coln in col_l]
tst_r = tst.groupby('col4').agg(*expr)
结果是
tst_r.show()
+----+--------+--------+--------+--------+--------+--------+---------+---------+---------+----------+----------+----------+
|col4|min_col1|min_col2|min_col3|max_col1|max_col2|max_col3|mean_col1|mean_col2|mean_col3|first_col1|first_col2|first_col3|
+----+--------+--------+--------+--------+--------+--------+---------+---------+---------+----------+----------+----------+
| 5| 5| 6| 7| 7| 8| 9| 6.0| 7.0| 8.0| 5| 6| 7|
| 4| 1| 2| 3| 3| 4| 5| 2.0| 3.0| 4.0| 1| 2| 3|
+----+--------+--------+--------+--------+--------+--------+---------+---------+---------+----------+----------+----------+
为了有选择地在列上应用函数,您可以拥有多个表达式数组并将它们连接到聚合中。
fn_l = [F.min,F.max]
fn_2=[F.mean,F.first]
col_l=['col1','col2']
col_2=['col1','col3','col4']
expr1 = [fn(coln).alias(str(fn.__name__)+'_'+str(coln)) for fn in fn_l for coln in col_l]
expr2 = [fn(coln).alias(str(fn.__name__)+'_'+str(coln)) for fn in fn_2 for coln in col_2]
tst_r = tst.groupby('col4').agg(*(expr1+expr2))
【讨论】:
在您的解决方案中,您将在所有列上应用所有函数,但我特别希望在少数列上使用很少的函数,我该如何使用 expr 来做到这一点 @ManuSharma - 已更新答案,检查它是否有效。您可以定义多个数组并在 agg 表达式中连接它们以上是关于pyspark:groupby 和聚合 avg 和 first 在多个列上的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 groupby 和聚合将 pyspark 数据框中的行与多列连接起来
如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?