pyspark groupby 并应用自定义函数
Posted
技术标签:
【中文标题】pyspark groupby 并应用自定义函数【英文标题】:pyspark groupby and apply a custom function 【发布时间】:2021-12-29 10:53:17 【问题描述】:我有一个与熊猫数据框 groupby 一起使用的自定义函数
def avg_df(df, weekss):
"""
1. Get data frame and average calculation window
2. Forward-rolling window starting from one year back data and calculate given time window average. eg: for 6th Dec 2021 to 6th Dec 2022 prediction, start 12 weeks rolling average starting from 6th Dec 2020 and rolls toward 6th Dec 2021.
3. generate future date of the same length
4. return the prepared data frame
"""
future_date = []
list1 = list(df.units)
for i in range(1,54):
avg = math.mean(list1[-(53 + weekss) + i:])
list1.append(avg)
for i in range(53):
future_date.append( date.today() + timedelta(days = 7 - date.today().weekday()) + timedelta(weeks=i))
data = pd.DataFrame('date': list(df.date.dt.date) + future_date, 'units': list1)
return data
在 pandas 中使用时有效,如图所示
df = df11.groupby(['customer_name','upc']).apply(avg_df, weekss=12).reset_index(inplace=False)
但是,我需要对其进行更改以使其与 pyspark 一起使用。我尝试了更改,但它不起作用。
使用 pyspark 在 apply 中传递参数会出现以下错误。
TypeError: apply() got an unexpected keyword argument 'weekss'
I looked up for similar solution, this answer is too simple to be used in my case.
请用它来生成数据框
df = pd.DataFrame('date':['2021-1-6','2021-3-13','2021-6-20','2021-10-27','2021-1-6','2021-3-13','2021-6-6','2021-10-6'],
'customer_name':['a1','a1','a1','a1','a1','a2','a2','a2'],
'upc':['b1','b1','b5','b5','b2','b2','b4','b4'],
'average_weekly_acv_distribution':[6,0,0,7,2,9,3,8],
'units':[8,0,0,8,1,9,3,8])
df['date'] = pd.to_datetime(df['date'])
df = spark.createDataFrame(df)
我查找了 pyspark 的 applyInPandas(),但它不允许任何参数。
【问题讨论】:
【参考方案1】:基于@DileepKumar 的答案,avg_df
可以使用partial
传递weekss
参数来部分应用。结果函数只接受dataframe
,可以在applyInPandas
中使用。
from pyspark.sql.types import *
schema = StructType([ \
StructField("units", IntegerType(), True), \
StructField("date", DateType(), True), \
StructField("upc", StringType(), True), \
StructField("customer_name", StringType(), True), \
])
import statistics as math
from datetime import date, timedelta
def avg_df(df: pd.DataFrame, weekss) -> pd.DataFrame:
upc = str(df["upc"].iloc[0])
customer_name = str(df["customer_name"].iloc[0])
future_date = []
list1 = list(df.units)
for i in range(1, 54):
avg = math.mean(list1[-(53 + weekss) + i:])
list1.append(avg)
for i in range(53):
future_date.append(date.today() + timedelta(days=7 - date.today().weekday()) + timedelta(weeks=i))
df = pd.DataFrame(
'date': list(df.date.dt.date) + future_date, 'units': list1, 'customer_name': customer_name, 'upc': upc)
return df
from functools import partial
df.groupBy('customer_name','upc').applyInPandas(partial(avg_df, weekss = 12), schema = schema).show()
"""
+-----+----------+---+-------------+
|units| date|upc|customer_name|
+-----+----------+---+-------------+
| 8|2021-01-06| b1| a1|
| 0|2021-03-13| b1| a1|
| 4|2022-01-03| b1| a1|
| 4|2022-01-10| b1| a1|
| 4|2022-01-17| b1| a1|
| 4|2022-01-24| b1| a1|
| 4|2022-01-31| b1| a1|
| 4|2022-02-07| b1| a1|
| 4|2022-02-14| b1| a1|
| 4|2022-02-21| b1| a1|
| 4|2022-02-28| b1| a1|
| 4|2022-03-07| b1| a1|
| 4|2022-03-14| b1| a1|
| 4|2022-03-21| b1| a1|
| 4|2022-03-28| b1| a1|
| 4|2022-04-04| b1| a1|
| 4|2022-04-11| b1| a1|
| 4|2022-04-18| b1| a1|
| 4|2022-04-25| b1| a1|
| 4|2022-05-02| b1| a1|
+-----+----------+---+-------------+
only showing top 20 rows
"""
【讨论】:
【参考方案2】:首先,我们需要定义输出自定义函数的架构
schema = StructType([ \
StructField("units", IntegerType(), True), \
StructField("date", DateType(), True), \
StructField("upc", StringType(), True), \
StructField("customer_name", StringType(), True), \
])
并更新自定义函数
def avg_df_12_weeks(df: pd.DataFrame )-> pd.DataFrame:
weekss = 12
upc =str(df["upc"].iloc[0])
customer_name = str(df["customer_name"].iloc[0])
future_date = []
list1 = list(df.units)
for i in range(1,54):
avg = mean(list1[-(53 + weekss) + i:])
list1.append(avg)
for i in range(53):
future_date.append( date.today() + timedelta(days = 7 - date.today().weekday()) + timedelta(weeks=i))
df = pd.DataFrame('date': list(df.date.dt.date) + future_date, 'units': list1, 'customer_name': customer_name, 'upc': upc)
return df
最后是 groupBy,applyInPandas 并将模式作为参数传递
df_sales_grouped.groupBy('customer_name','upc').applyInPandas(avg_df_12_weeks, schema = schema)
缺点:不允许将参数传递给自定义函数
【讨论】:
以上是关于pyspark groupby 并应用自定义函数的主要内容,如果未能解决你的问题,请参考以下文章
应用自定义 groupby 聚合函数在 pandas python 中输出二进制结果
pandas使用groupby函数进行分组聚合使用agg函数指定聚合统计计算的数值变量并自定义统计计算结果的名称(naming columns after aggregation)