基于 DataFrame 中另一列的列的滚动总和
Posted
技术标签:
【中文标题】基于 DataFrame 中另一列的列的滚动总和【英文标题】:Rolling Sum of a column based on another column in a DataFrame 【发布时间】:2019-12-01 15:03:14 【问题描述】:我有一个如下所示的 DataFrame
ID Date Amount
10001 2019-07-01 50
10001 2019-05-01 15
10001 2019-06-25 10
10001 2019-05-27 20
10002 2019-06-29 25
10002 2019-07-18 35
10002 2019-07-15 40
从金额列中,我试图根据日期列获得 4 周的滚动总和。我的意思是,基本上我还需要一列(比如 amount_4wk_rolling),它将包含 4 周前所有行的金额列的总和。因此,如果该行中的日期是 2019-07-01,那么 amount_4wk_rolling 列值应该是日期在 2019-07-01 和 2019-06-04 之间的所有行的数量之和(2019-07-01负 28 天)。 所以新的 DataFrame 看起来像这样。
ID Date Amount amount_4wk_rolling
10001 2019-07-01 50 60
10001 2019-05-01 15 15
10001 2019-06-25 10 30
10001 2019-05-27 20 35
10002 2019-06-29 25 25
10002 2019-07-18 35 100
10002 2019-07-15 40 65
我尝试过使用窗口函数,但它不允许我根据特定列的值选择窗口
Edit:
My data is huge...about a TB in size. Ideally, I would like to do this in spark rather that in pandas
【问题讨论】:
【参考方案1】:按照建议,您可以在 Date
上使用带有“28d”的 .rolling
。
似乎(根据您的示例值)您还希望滚动窗口按 ID 分组。
试试这个:
import pandas as pd
from io import StringIO
s = """
ID Date Amount
10001 2019-07-01 50
10001 2019-05-01 15
10001 2019-06-25 10
10001 2019-05-27 20
10002 2019-06-29 25
10002 2019-07-18 35
10002 2019-07-15 40
"""
df = pd.read_csv(StringIO(s), sep="\s+")
df['Date'] = pd.to_datetime(df['Date'])
amounts = df.groupby(["ID"]).apply(lambda g: g.sort_values('Date').rolling('28d', on='Date').sum())
df['amount_4wk_rolling'] = df["Date"].map(amounts.set_index('Date')['Amount'])
print(df)
输出:
ID Date Amount amount_4wk_rolling
0 10001 2019-07-01 50 60.0
1 10001 2019-05-01 15 15.0
2 10001 2019-06-25 10 10.0
3 10001 2019-05-27 20 35.0
4 10002 2019-06-29 25 25.0
5 10002 2019-07-18 35 100.0
6 10002 2019-07-15 40 65.0
【讨论】:
@RiyanMohammed 哦,我明白了。抱歉,我从来没有真正使用过 spark 数据帧,或者一次性使用 TB 数据:( 非常有用 - 你能解释一下df['amount_4wk_rolling'] = df["Date"].map(amounts.set_index('Date')['Amount'])
这行是如何工作的吗?【参考方案2】:
这可以用pandas_udf
来完成,看起来你想用'ID'分组,所以我用它作为组ID。
spark = SparkSession.builder.appName('test').getOrCreate()
df = spark.createDataFrame([Row(ID=10001, d='2019-07-01', Amount=50),
Row(ID=10001, d='2019-05-01', Amount=15),
Row(ID=10001, d='2019-06-25', Amount=10),
Row(ID=10001, d='2019-05-27', Amount=20),
Row(ID=10002, d='2019-06-29', Amount=25),
Row(ID=10002, d='2019-07-18', Amount=35),
Row(ID=10002, d='2019-07-15', Amount=40)
])
df = df.withColumn('date', F.to_date('d', 'yyyy-MM-dd'))
df = df.withColumn('prev_date', F.date_sub(df['date'], 28))
df.select(["ID", "prev_date", "date", "Amount"]).orderBy('date').show()
df = df.withColumn('amount_4wk_rolling', F.lit(0.0))
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def roll_udf(pdf):
for index, row in pdf.iterrows():
d, pd = row['date'], row['prev_date']
pdf.loc[pdf['date']==d, 'amount_4wk_rolling'] = np.sum(pdf.loc[(pdf['date']<=d)&(pdf['date']>=pd)]['Amount'])
return pdf
df = df.groupby('ID').apply(roll_udf)
df.select(['ID', 'date', 'prev_date', 'Amount', 'amount_4wk_rolling']).orderBy(['ID', 'date']).show()
输出:
+-----+----------+----------+------+
| ID| prev_date| date|Amount|
+-----+----------+----------+------+
|10001|2019-04-03|2019-05-01| 15|
|10001|2019-04-29|2019-05-27| 20|
|10001|2019-05-28|2019-06-25| 10|
|10002|2019-06-01|2019-06-29| 25|
|10001|2019-06-03|2019-07-01| 50|
|10002|2019-06-17|2019-07-15| 40|
|10002|2019-06-20|2019-07-18| 35|
+-----+----------+----------+------+
+-----+----------+----------+------+------------------+
| ID| date| prev_date|Amount|amount_4wk_rolling|
+-----+----------+----------+------+------------------+
|10001|2019-05-01|2019-04-03| 15| 15.0|
|10001|2019-05-27|2019-04-29| 20| 35.0|
|10001|2019-06-25|2019-05-28| 10| 10.0|
|10001|2019-07-01|2019-06-03| 50| 60.0|
|10002|2019-06-29|2019-06-01| 25| 25.0|
|10002|2019-07-15|2019-06-17| 40| 65.0|
|10002|2019-07-18|2019-06-20| 35| 100.0|
+-----+----------+----------+------+------------------+
【讨论】:
【参考方案3】:对于pyspark,你可以使用Window函数:sum + RangeBetween
from pyspark.sql import functions as F, Window
# skip code to initialize Spark session and dataframe
>>> df.show()
+-----+----------+------+
| ID| Date|Amount|
+-----+----------+------+
|10001|2019-07-01| 50|
|10001|2019-05-01| 15|
|10001|2019-06-25| 10|
|10001|2019-05-27| 20|
|10002|2019-06-29| 25|
|10002|2019-07-18| 35|
|10002|2019-07-15| 40|
+-----+----------+------+
>>> df.printSchema()
root
|-- ID: long (nullable = true)
|-- Date: string (nullable = true)
|-- Amount: long (nullable = true)
win = Window.partitionBy('ID').orderBy(F.to_timestamp('Date').astype('long')).rangeBetween(-28*86400,0)
df_new = df.withColumn('amount_4wk_rolling', F.sum('Amount').over(win))
>>> df_new.show()
+------+-----+----------+------------------+
|Amount| ID| Date|amount_4wk_rolling|
+------+-----+----------+------------------+
| 25|10002|2019-06-29| 25|
| 40|10002|2019-07-15| 65|
| 35|10002|2019-07-18| 100|
| 15|10001|2019-05-01| 15|
| 20|10001|2019-05-27| 35|
| 10|10001|2019-06-25| 10|
| 50|10001|2019-07-01| 60|
+------+-----+----------+------------------+
【讨论】:
以上是关于基于 DataFrame 中另一列的列的滚动总和的主要内容,如果未能解决你的问题,请参考以下文章
用 Pandas 将 DataFrame 中某些列和行的值替换为同一 DataFrame 中另一列的值