Pyspark 将列列表转换为聚合函数
Posted
技术标签:
【中文标题】Pyspark 将列列表转换为聚合函数【英文标题】:Pyspark getting column list into aggregation function 【发布时间】:2020-11-26 13:52:14 【问题描述】:我对 Pyspark 中的 udfs 和一个具体案例有疑问。 我正在尝试制作一个简单的可重用函数来聚合不同级别和组的值。 输入应该是:
-
现有数据框
分组依据的变量(单列或列表)
要聚合的变量(同上)
要应用的函数(一个特定的函数或它们的列表)。我保持简单的求和、平均、最小值、最大值等......
当我有一个函数或一个列表时,我让它工作,但是当涉及到聚合变量时,我被困在将它们的列表引入函数中
def aggregate(dataframe,grouping,aggregation,functions):
**First part works ok on single functions and single columns**
if hasattr(aggregation,'__iter__') == False and hasattr(functions,'__iter__') == False:
if functions == sum:
df = dataframe.groupby(grouping).sum(aggregation)
elif functions == avg:
df = dataframe.groupby(grouping).avg(aggregation)
elif functions == min:
df = dataframe.groupby(grouping).min(aggregation)
elif functions == max:
df = dataframe.groupby(grouping).max(aggregation)
elif functions == count:
df = dataframe.groupby(grouping).count(aggregation)
elif functions == countDistinct:
df = dataframe.groupby(grouping).countDistinct(aggregation)
**Here is where I got into the part I struggle with, if aggregation == [some list] it will not work
elif hasattr(aggregation,'__iter__') == True and hasattr(functions,'__iter__') == False:
if functions == sum:
df = dataframe.groupby(grouping).sum(aggregation)
elif functions == avg:
df = dataframe.groupby(grouping).avg(aggregation)
elif functions == min:
df = dataframe.groupby(grouping).min(aggregation)
elif functions == max:
df = dataframe.groupby(grouping).max(aggregation)
elif functions == count:
df = dataframe.groupby(grouping).count(aggregation)
elif functions == countDistinct:
df = dataframe.groupby(grouping).countDistinct(aggregation)
**Expression to get inputs as lists works too**
else:
expression_def = [f(col(c)) for f in functions for c in aggregation]
df = dataframe.groupby(grouping).agg(*expression_def)
return df
【问题讨论】:
【参考方案1】:你可以使用agg
方法:
def aggregate(dataframe, grouping, aggregation, functions):
if hasattr(aggregation, "__iter__"):
return dataframe.groupBy(grouping).agg(f"item": f"functions" for item in aggregation)
else:
return dataframe.groupBy(grouping).agg(f"aggregation": f"functions")
【讨论】:
【参考方案2】:只需使用 agg() 函数,您就可以节省大量的写作时间。
例子:
grouping = ["A", "B", "C"]
aggregation = "A": "max", "B": "avg"
df = dataframe.groupBy(grouping).agg(agregation)
这也将帮助您在多列上进行聚合,因为您可以将其作为列上聚合函数的字典传递。
在你的情况下,看起来像:
aggregation = "A": "max", "B": "max": "C":"max"
参考: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.DataFrame.agg
【讨论】:
以上是关于Pyspark 将列列表转换为聚合函数的主要内容,如果未能解决你的问题,请参考以下文章
如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?