Pyspark 将列列表转换为聚合函数

Posted

技术标签:

【中文标题】Pyspark 将列列表转换为聚合函数【英文标题】:Pyspark getting column list into aggregation function 【发布时间】:2020-11-26 13:52:14 【问题描述】:

我对 Pyspark 中的 udfs 和一个具体案例有疑问。 我正在尝试制作一个简单的可重用函数来聚合不同级别和组的值。 输入应该是:

    现有数据框 分组依据的变量(单列或列表) 要聚合的变量(同上) 要应用的函数(一个特定的函数或它们的列表)。我保持简单的求和、平均、最小值、最大值等......

当我有一个函数或一个列表时,我让它工作,但是当涉及到聚合变量时,我被困在将它们的列表引入函数中

def aggregate(dataframe,grouping,aggregation,functions):
   
   **First part works ok on single functions and single columns**

   if hasattr(aggregation,'__iter__') == False and hasattr(functions,'__iter__') == False:
    if functions == sum:
      df = dataframe.groupby(grouping).sum(aggregation)
    elif functions == avg:
       df = dataframe.groupby(grouping).avg(aggregation)
    elif functions == min:
       df = dataframe.groupby(grouping).min(aggregation)
    elif functions == max:
       df = dataframe.groupby(grouping).max(aggregation)
    elif functions == count:
       df = dataframe.groupby(grouping).count(aggregation)
    elif functions == countDistinct:
      df = dataframe.groupby(grouping).countDistinct(aggregation)

  **Here is where I got into the part I struggle with, if aggregation == [some list] it will not work

  elif hasattr(aggregation,'__iter__') == True and hasattr(functions,'__iter__') == False:
    if functions == sum:
      df = dataframe.groupby(grouping).sum(aggregation)
    elif functions == avg:
       df = dataframe.groupby(grouping).avg(aggregation)
    elif functions == min:
       df = dataframe.groupby(grouping).min(aggregation)
    elif functions == max:
       df = dataframe.groupby(grouping).max(aggregation)
    elif functions == count:
       df = dataframe.groupby(grouping).count(aggregation)
    elif functions == countDistinct:
      df = dataframe.groupby(grouping).countDistinct(aggregation)
  

  **Expression to get inputs as lists works too**     
  else:
    expression_def = [f(col(c)) for f in functions for c in aggregation]
    df = dataframe.groupby(grouping).agg(*expression_def)
  return df

【问题讨论】:

【参考方案1】:

你可以使用agg方法:

def aggregate(dataframe, grouping, aggregation, functions):
    if hasattr(aggregation, "__iter__"):
        return dataframe.groupBy(grouping).agg(f"item": f"functions" for item in aggregation)
    else:
        return dataframe.groupBy(grouping).agg(f"aggregation": f"functions")

【讨论】:

【参考方案2】:

只需使用 agg() 函数,您就可以节省大量的写作时间。

例子:

grouping = ["A", "B", "C"]
aggregation = "A": "max", "B": "avg"
df = dataframe.groupBy(grouping).agg(agregation)

这也将帮助您在多列上进行聚合,因为您可以将其作为列上聚合函数的字典传递。

在你的情况下,看起来像:

aggregation = "A": "max", "B": "max": "C":"max"

参考: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.DataFrame.agg

【讨论】:

以上是关于Pyspark 将列列表转换为聚合函数的主要内容,如果未能解决你的问题,请参考以下文章

将列值转换为行值

如何通过对其他函数的字段执行聚合函数来将列添加到表中?

如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?

PySpark 中是不是有与 Pandas 聚合函数 any() 等效的函数?

在 pyspark 中应用用户定义的聚合函数的替代方法

PySpark - 运行 Count() / 聚合函数(平均等)时出现不一致