加入 PySpark 数据集中每个月的上个月数据
Posted
技术标签:
【中文标题】加入 PySpark 数据集中每个月的上个月数据【英文标题】:Joining on the previous month data for each month in the PySpark dataset 【发布时间】:2020-01-28 08:46:34 【问题描述】:我有一个按月计算的数据集,每个月有 N 个帐户。有些月份会有新账户,有些账户会在某个月后消失(这是随机完成的)。
我需要获取一个帐户的当月余额并从上个月的余额中扣除(如果该帐户在上个月存在),否则将其作为当月的余额。
有人建议我每个月加入一次。即加入第 1 个月到第 2 个月,第 2 个月到第 3 个月,等等。但我不完全确定会怎样......
这是一个示例数据集:
|date |account |balance |
----------------------------------
|01.01.2019|1 |40 |
|01.01.2019|2 |33 |
|01.01.2019|3 |31 |
|01.02.2019|1 |32 |
|01.02.2019|2 |56 |
|01.02.2019|4 |89 |
|01.03.2019|2 |12 |
|01.03.2019|4 |35 |
|01.03.2019|5 |76 |
|01.03.2019|6 |47 |
----------------------------------
每个已离开、当前和新加入的帐户的帐户 ID 都是唯一的。
我最初使用f.lag,但是现在有帐户消失并新加入,每个月的帐户数量不是恒定的,所以我不能滞后。正如我所说,我被建议使用加入。 IE。加入一月到二月,二月到三月等。
但我不确定那会怎样。有人有什么想法吗?
附:我创建了此表,其中包含一个保留帐户、一个新帐户和一个从以后几个月中删除的帐户的示例。
最终目标是:
|date |account |balance | balance_diff_with_previous_month |
--------------------------------------------------------------------|
|01.01.2019|1 |40 |na |
|01.01.2019|2 |33 |na |
|01.01.2019|3 |31 |na |
|01.02.2019|1 |32 |-8 |
|01.02.2019|2 |56 |23 |
|01.02.2019|4 |89 |89 |
|01.03.2019|2 |12 |-44 |
|01.03.2019|4 |35 |-54 |
|01.03.2019|5 |76 |76 |
|01.03.2019|6 |47 |47 |
--------------------------------------------------------------------|
正如我所说,f.lag 不能使用,因为每月的帐户数不是恒定的,并且我不控制帐户数,因此不能 f.lag 恒定的行数。
有人对如何使用 date-1(上个月)加入帐户和/或日期(当前月份)有任何想法吗?
感谢阅读和帮助:)
【问题讨论】:
你能解释一下你的输出值吗,我不明白你是如何计算账户 2 的日期 01.03.2019 的余额差异,在你的情况下是 -20,你能解释一下公式吗?对于帐户 4,应该 -54 而不是 -44 (35 - 89) 我更正了数字。道歉。我的快速数学很糟糕。 【参考方案1】:使用连接的替代解决方案 ....
df = spark.createDataFrame([
("01.01.2019", 1, 40),("01.01.2019", 2, 33),("01.01.2019", 3, 31),
("01.02.2019", 1, 32), ("01.02.2019", 2, 56),("01.02.2019", 4, 89),
("01.03.2019", 2, 12),("01.03.2019", 4, 35),("01.03.2019", 5, 76),("01.03.2019", 6, 47)],
["date","account","balance"])
df.alias("current").join(
df.alias("previous"),
[F.to_date(F.col("previous.date"), "dd.MM.yyyy") == F.to_date(F.add_months(F.to_date(F.col("current.date"), "dd.MM.yyyy"),-1),"dd.MM.yyyy"), F.col("previous.account") == F.col("current.account")],
"left"
).select(
F.col("current.date").alias("date"),
F.coalesce("current.account", "previous.account").alias("account"),
F.col("current.balance").alias("balance"),
(F.col("current.balance") - F.coalesce(F.col("previous.balance"), F.lit(0))).alias("balance_diff_with_previous_month")
).orderBy("date","account").show()
结果
+----------+-------+-------+--------------------------------+
| date|account|balance|balance_diff_with_previous_month|
+----------+-------+-------+--------------------------------+
|01.01.2019| 1| 40| 40|
|01.01.2019| 2| 33| 33|
|01.01.2019| 3| 31| 31|
|01.02.2019| 1| 32| -8|
|01.02.2019| 2| 56| 23|
|01.02.2019| 4| 89| 89|
|01.03.2019| 2| 12| -44|
|01.03.2019| 4| 35| -54|
|01.03.2019| 5| 76| 76|
|01.03.2019| 6| 47| 47|
+----------+-------+-------+--------------------------------+
【讨论】:
我会记住的。但与底部@LaSul 的答案相比,它看起来像很多额外的代码行。非常感谢您的回复! 您可以检查这两个选项的性能,而不是检查代码行数。一般来说,窗口函数很昂贵,但您可以检查对数据集使用窗口 + 滞后与连接的性能并使用最好的。【参考方案2】:F.lag
如果您按account
和
partition = Window.partitionBy("account") \
.orderBy(F.col("date").cast("timestamp").cast("long"))
previousAmount = data.withColumn("balance_diff_with_previous_month", F.lag("balance").over(partition))
.show(10, False)
【讨论】:
我相信这行得通。我将使用真实数据对其进行更多测试,然后如果它仍然有效(我希望它会这样做),请接受您的答案。非常感谢! 我没有用过很多Window函数,也不是很熟悉,但是为什么我们需要在第一行末尾的.cast(“long”)?还假设日期列将作为日期出现,我们可以省略 cast("timestamp") 吗? 没有必要强制转换,但这只是时间戳的一种形式【参考方案3】:>>> from pyspark.sql.functions import *
>>> from pyspark.sql import Window
>>> df.show()
+----------+-------+-------+
| date|account|balance|
+----------+-------+-------+
|01.01.2019| 1| 40|
|01.01.2019| 2| 33|
|01.01.2019| 3| 31|
|01.02.2019| 1| 32|
|01.02.2019| 2| 56|
|01.02.2019| 4| 89|
|01.03.2019| 2| 12|
|01.03.2019| 4| 35|
|01.03.2019| 5| 76|
|01.03.2019| 6| 47|
+----------+-------+-------+
>>> df1 = df.withColumn("date", expr("to_date(date, 'dd.MM.yyyy')"))
>>> W = Window.partitionBy("account").orderBy("date")
>>> df1.withColumn("balance_diff_with_previous_month", col("balance") - lag(col("balance"),1,0).over(W)).show()
+----------+-------+-------+--------------------------------+
| date|account|balance|balance_diff_with_previous_month|
+----------+-------+-------+--------------------------------+
|2019-01-01| 1| 40| 40.0|
|2019-01-01| 2| 33| 33.0|
|2019-01-01| 3| 31| 31.0|
|2019-02-01| 1| 32| -8.0|
|2019-02-01| 2| 56| 23.0|
|2019-02-01| 4| 89| 89.0|
|2019-03-01| 2| 12| -44.0|
|2019-03-01| 4| 35| -54.0|
|2019-03-01| 5| 76| 76.0|
|2019-03-01| 6| 47| 47.0|
+----------+-------+-------+--------------------------------+
【讨论】:
以上是关于加入 PySpark 数据集中每个月的上个月数据的主要内容,如果未能解决你的问题,请参考以下文章
从估算日期开始每个唯一用户的前六个月数据 - PostgreSQL