pyspark groupby 并应用自定义函数

Posted

技术标签:

【中文标题】pyspark groupby 并应用自定义函数【英文标题】:pyspark groupby and apply a custom function 【发布时间】:2021-12-29 10:53:17 【问题描述】:

我有一个与熊猫数据框 groupby 一起使用的自定义函数

def avg_df(df, weekss):
  """
  1. Get data frame and average calculation window
  2. Forward-rolling window starting from one year back data and calculate given time window average. eg: for 6th Dec 2021 to 6th Dec 2022 prediction, start 12 weeks rolling average starting from 6th Dec 2020 and rolls toward 6th Dec 2021.
  3. generate future date of the same length
  4. return the prepared data frame
  """
  future_date = []
  list1 = list(df.units)
  for i in range(1,54):
    avg = math.mean(list1[-(53 + weekss) + i:])
    list1.append(avg)
  for i in range(53):
    future_date.append( date.today() + timedelta(days = 7 - date.today().weekday()) + timedelta(weeks=i))
  data = pd.DataFrame('date': list(df.date.dt.date) + future_date, 'units': list1)
  return data

在 pandas 中使用时有效,如图所示

df = df11.groupby(['customer_name','upc']).apply(avg_df, weekss=12).reset_index(inplace=False)

但是,我需要对其进行更改以使其与 pyspark 一起使用。我尝试了更改,但它不起作用。

使用 pyspark 在 apply 中传递参数会出现以下错误。

TypeError: apply() got an unexpected keyword argument 'weekss'

I looked up for similar solution, this answer is too simple to be used in my case.

请用它来生成数据框

df = pd.DataFrame('date':['2021-1-6','2021-3-13','2021-6-20','2021-10-27','2021-1-6','2021-3-13','2021-6-6','2021-10-6'],
                   'customer_name':['a1','a1','a1','a1','a1','a2','a2','a2'],
                   'upc':['b1','b1','b5','b5','b2','b2','b4','b4'],
                   'average_weekly_acv_distribution':[6,0,0,7,2,9,3,8],
                   'units':[8,0,0,8,1,9,3,8])
df['date'] = pd.to_datetime(df['date'])
df = spark.createDataFrame(df)

我查找了 pyspark 的 applyInPandas(),但它不允许任何参数。

【问题讨论】:

【参考方案1】:

基于@DileepKumar 的答案,avg_df 可以使用partial 传递weekss 参数来部分应用。结果函数只接受dataframe,可以在applyInPandas中使用。

from pyspark.sql.types import *

schema = StructType([ \
    StructField("units", IntegerType(), True), \
    StructField("date", DateType(), True), \
    StructField("upc", StringType(), True), \
    StructField("customer_name", StringType(), True), \
  ])

import statistics as math
from datetime import date, timedelta

def avg_df(df: pd.DataFrame, weekss) -> pd.DataFrame:
    upc = str(df["upc"].iloc[0])
    customer_name = str(df["customer_name"].iloc[0])
    future_date = []
    list1 = list(df.units)
    for i in range(1, 54):
        avg = math.mean(list1[-(53 + weekss) + i:])
        list1.append(avg)
    for i in range(53):
        future_date.append(date.today() + timedelta(days=7 - date.today().weekday()) + timedelta(weeks=i))
    df = pd.DataFrame(
        'date': list(df.date.dt.date) + future_date, 'units': list1, 'customer_name': customer_name, 'upc': upc)
    return df

from functools import partial

df.groupBy('customer_name','upc').applyInPandas(partial(avg_df, weekss = 12), schema = schema).show()

"""
+-----+----------+---+-------------+
|units|      date|upc|customer_name|
+-----+----------+---+-------------+
|    8|2021-01-06| b1|           a1|
|    0|2021-03-13| b1|           a1|
|    4|2022-01-03| b1|           a1|
|    4|2022-01-10| b1|           a1|
|    4|2022-01-17| b1|           a1|
|    4|2022-01-24| b1|           a1|
|    4|2022-01-31| b1|           a1|
|    4|2022-02-07| b1|           a1|
|    4|2022-02-14| b1|           a1|
|    4|2022-02-21| b1|           a1|
|    4|2022-02-28| b1|           a1|
|    4|2022-03-07| b1|           a1|
|    4|2022-03-14| b1|           a1|
|    4|2022-03-21| b1|           a1|
|    4|2022-03-28| b1|           a1|
|    4|2022-04-04| b1|           a1|
|    4|2022-04-11| b1|           a1|
|    4|2022-04-18| b1|           a1|
|    4|2022-04-25| b1|           a1|
|    4|2022-05-02| b1|           a1|
+-----+----------+---+-------------+
only showing top 20 rows
"""

【讨论】:

【参考方案2】:

首先,我们需要定义输出自定义函数的架构

schema = StructType([ \
    StructField("units", IntegerType(), True), \
    StructField("date", DateType(), True), \
    StructField("upc", StringType(), True), \
    StructField("customer_name", StringType(), True), \
  ])

并更新自定义函数

def avg_df_12_weeks(df: pd.DataFrame )-> pd.DataFrame:
  weekss = 12
  upc =str(df["upc"].iloc[0])
  customer_name = str(df["customer_name"].iloc[0])
  future_date = []
  list1 = list(df.units)
  for i in range(1,54):
    avg = mean(list1[-(53 + weekss) + i:])
    list1.append(avg)
  for i in range(53):
    future_date.append( date.today() + timedelta(days = 7 - date.today().weekday()) + timedelta(weeks=i))
  df = pd.DataFrame('date': list(df.date.dt.date) + future_date, 'units': list1, 'customer_name': customer_name, 'upc': upc)
  return df

最后是 groupBy,applyInPandas 并将模式作为参数传递

df_sales_grouped.groupBy('customer_name','upc').applyInPandas(avg_df_12_weeks, schema = schema)

缺点:不允许将参数传递给自定义函数

【讨论】:

以上是关于pyspark groupby 并应用自定义函数的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 数据框上的自定义函数

将 PySpark 命令转换为自定义函数

应用自定义 groupby 聚合函数在 pandas python 中输出二进制结果

pandas使用groupby函数进行分组聚合使用agg函数指定聚合统计计算的数值变量并自定义统计计算结果的名称(naming columns after aggregation)

Pyspark 使用自定义函数

pyspark:从 pyspark 调用自定义 java 函数。我需要 Java_Gateway 吗?