Pyspark - 获取具有条件的列的累积总和

Posted

技术标签:

【中文标题】Pyspark - 获取具有条件的列的累积总和【英文标题】:Pyspark - Get cumulative sum of of a column with condition 【发布时间】:2019-01-10 17:32:38 【问题描述】:

我有一个包含卡片、时间和金额的数据框,我需要在一个月的窗口内汇总卡片的金额(总和和计数)。

以下是数据的样子:

+--------------------+-------------------+------------+
|            card_uid|               date|amount_local|
+--------------------+-------------------+------------+
|card_001H4Mw1Ha0M...|2016-05-04 17:54:30|        8.99|
|card_0026uGZQwZQd...|2016-05-06 12:16:18|       16.19|
|card_0026uGZQwZQd...|2016-07-06 12:17:57|       16.19|
|card_003STfrgB8SZ...|2016-12-04 10:05:21|        58.8|
|card_005gBxyiDc6b...|2016-09-10 18:58:25|       27.95|
|card_005gBxyiDc6b...|2016-11-12 11:18:29|       12.99|

这是我到目前为止所做的。

+--------------------+-------------------+------------+----------------+
|            card_uid|               date|amount_local|duration_cum_sum|
+--------------------+-------------------+------------+----------------+
|card_001H4Mw1Ha0M...|2016-05-04 17:54:30|        8.99|            8.99|
|card_0026uGZQwZQd...|2016-05-06 12:16:18|       16.19|           16.19|
|card_0026uGZQwZQd...|2016-07-06 12:17:57|       16.19|           32.38|
|card_003STfrgB8SZ...|2016-12-04 10:05:21|        58.8|            58.8|
|card_005gBxyiDc6b...|2016-09-10 18:58:25|       27.95|           27.95|
|card_005gBxyiDc6b...|2016-11-12 11:18:29|       12.99|           40.94|

下面有窗口函数:

partition = Window.partitionBy("card_uid").orderBy("date")

previousTransactionDate = data.withColumn("previous_tr_time", lag(data.date).over(partition)).select("transaction_id", "card_uid", "date", "previous_tr_time") 

df_cum_sum = data.withColumn("duration_cum_sum", sum('amount_local').over(partition))

df_cum_sum.orderBy("card_uid","date").select("card_uid", "date", "amount_local", "duration_cum_sum").show()

但我唯一要补充的是两件事:

仅当日期小于一个月 时,以相同方式聚合 用 zero 代替 cum_sum 的相同数量

所以需要的输出看起来像这样:

+--------------------+-------------------+------------+----------------+
|            card_uid|               date|amount_local|duration_cum_sum|
+--------------------+-------------------+------------+----------------+
|card_001H4Mw1Ha0M...|2016-05-04 17:54:30|        8.99|               0|
|card_0026uGZQwZQd...|2016-05-06 12:16:18|       16.19|               0|
|card_0026uGZQwZQd...|2016-05-12 12:17:57|        4.00|           16.19|
|card_0026uGZQwZQd...|2016-06-06 12:23:51|       16.19|            4.00| => Only 4 because de 16.19 was more than one month ago
|card_003STfrgB8SZ...|2016-12-04 10:05:21|        58.8|               0|
|card_005gBxyiDc6b...|2016-09-10 18:58:25|       27.95|               0|
|card_005gBxyiDc6b...|2016-09-12 11:18:29|       12.99|           27.95| => Previous amount 
|card_005gBxyiDc6b...|2016-09-22 14:25:44|       23.99|           40.94| => 27.95 + 12.99

我无法 groupBy card_uid,因为我需要与原始表相同的行数才能链接到另一个表

【问题讨论】:

How to aggregate over rolling time window with groups in Spark的可能重复 好吧,只要我需要一个累积总和而不通过 groupby,这就是行不通的。这对我没有帮助,因为我无法获得交易前一个 id 的链接 请取消复制 【参考方案1】:

您需要一个日期滚动窗口,窗口范围从过去 30 天到前一天。由于 window 没有区间函数,所以可以将日期转换为 long 值,并使用 days long 值来创建窗口范围。

from pyspark.sql.functions import *
days = lambda i: i * 86400 

partition = Window.partitionBy("card_uid").orderBy(col("date").cast("timestamp").cast("long")).rangeBetween(days(-30), days(-1))

df_cum_sum = data.withColumn("duration_cum_sum",sum(col('amount_local')).over(partition))\
                 .fillna(0,subset=['duration_cum_sum'])
df_cum_sum.show()

【讨论】:

谢谢!!它像我想要的那样工作! :) 我刚刚将 days(-30) 更改为 (-31),因为这错过了最后一天,但这是完美的

以上是关于Pyspark - 获取具有条件的列的累积总和的主要内容,如果未能解决你的问题,请参考以下文章

使用pyspark中的条件创建具有运行总量的列

Pyspark - 具有重置条件的累积和

自动计算数据表页脚中具有数字数据的列的总和

如何在pyspark中连接具有相同名称的列的值

查询以选择某些条件为真的两个日期之间的列的总和

PySpark:如何将具有 SparseVector 类型的列的 Spark 数据帧写入 CSV 文件?