加入 PySpark 数据集中每个月的上个月数据

Posted

技术标签:

【中文标题】加入 PySpark 数据集中每个月的上个月数据【英文标题】:Joining on the previous month data for each month in the PySpark dataset 【发布时间】:2020-01-28 08:46:34 【问题描述】:

我有一个按月计算的数据集,每个月有 N 个帐户。有些月份会有新账户,有些账户会在某个月后消失(这是随机完成的)。

我需要获取一个帐户的当月余额并从上个月的余额中扣除(如果该帐户在上个月存在),否则将其作为当月的余额。

有人建议我每个月加入一次。即加入第 1 个月到第 2 个月,第 2 个月到第 3 个月,等等。但我不完全确定会怎样......

这是一个示例数据集:

|date      |account   |balance   |
----------------------------------
|01.01.2019|1         |40        |
|01.01.2019|2         |33        |
|01.01.2019|3         |31        |
|01.02.2019|1         |32        |
|01.02.2019|2         |56        |
|01.02.2019|4         |89        |
|01.03.2019|2         |12        |
|01.03.2019|4         |35        |
|01.03.2019|5         |76        |
|01.03.2019|6         |47        |
----------------------------------

每个已离开、当前和新加入的帐户的帐户 ID 都是唯一的。

我最初使用f.lag,但是现在有帐户消失并新加入,每个月的帐户数量不是恒定的,所以我不能滞后。正如我所说,我被建议使用加入。 IE。加入一月到二月,二月到三月等。

但我不确定那会怎样。有人有什么想法吗?

附:我创建了此表,其中包含一个保留帐户、一个新帐户和一个从以后几个月中删除的帐户的示例。

最终目标是:

|date      |account   |balance   | balance_diff_with_previous_month  |
--------------------------------------------------------------------|
|01.01.2019|1         |40        |na                                |
|01.01.2019|2         |33        |na                                |
|01.01.2019|3         |31        |na                                |
|01.02.2019|1         |32        |-8                                |
|01.02.2019|2         |56        |23                                |
|01.02.2019|4         |89        |89                                |
|01.03.2019|2         |12        |-44                               |
|01.03.2019|4         |35        |-54                               |
|01.03.2019|5         |76        |76                                |
|01.03.2019|6         |47        |47                                |
--------------------------------------------------------------------|

正如我所说,f.lag 不能使用,因为每月的帐户数不是恒定的,并且我不控制帐户数,因此不能 f.lag 恒定的行数。

有人对如何使用 date-1(上个月)加入帐户和/或日期(当前月份)有任何想法吗?

感谢阅读和帮助:)

【问题讨论】:

你能解释一下你的输出值吗,我不明白你是如何计算账户 2 的日期 01.03.2019 的余额差异,在你的情况下是 -20,你能解释一下公式吗?对于帐户 4,应该 -54 而不是 -44 (35 - 89) 我更正了数字。道歉。我的快速数学很糟糕。 【参考方案1】:

使用连接的替代解决方案 ....

df = spark.createDataFrame([
            ("01.01.2019", 1, 40),("01.01.2019", 2, 33),("01.01.2019", 3, 31),
            ("01.02.2019", 1, 32), ("01.02.2019", 2, 56),("01.02.2019", 4, 89),
            ("01.03.2019", 2, 12),("01.03.2019", 4, 35),("01.03.2019", 5, 76),("01.03.2019", 6, 47)],
            ["date","account","balance"])

df.alias("current").join(
    df.alias("previous"),
    [F.to_date(F.col("previous.date"), "dd.MM.yyyy") == F.to_date(F.add_months(F.to_date(F.col("current.date"), "dd.MM.yyyy"),-1),"dd.MM.yyyy"), F.col("previous.account") == F.col("current.account")],
    "left"
).select(
    F.col("current.date").alias("date"),
    F.coalesce("current.account", "previous.account").alias("account"),
    F.col("current.balance").alias("balance"),
    (F.col("current.balance") - F.coalesce(F.col("previous.balance"), F.lit(0))).alias("balance_diff_with_previous_month")
).orderBy("date","account").show()

结果

+----------+-------+-------+--------------------------------+
|      date|account|balance|balance_diff_with_previous_month|
+----------+-------+-------+--------------------------------+
|01.01.2019|      1|     40|                              40|
|01.01.2019|      2|     33|                              33|
|01.01.2019|      3|     31|                              31|
|01.02.2019|      1|     32|                              -8|
|01.02.2019|      2|     56|                              23|
|01.02.2019|      4|     89|                              89|
|01.03.2019|      2|     12|                             -44|
|01.03.2019|      4|     35|                             -54|
|01.03.2019|      5|     76|                              76|
|01.03.2019|      6|     47|                              47|
+----------+-------+-------+--------------------------------+

【讨论】:

我会记住的。但与底部@LaSul 的答案相比,它看起来像很多额外的代码行。非常感谢您的回复! 您可以检查这两个选项的性能,而不是检查代码行数。一般来说,窗口函数很昂贵,但您可以检查对数据集使用窗口 + 滞后与连接的性能并使用最好的。【参考方案2】:

F.lag 如果您按account

分区,则可以完美满足您的需求
partition = Window.partitionBy("account") \
                  .orderBy(F.col("date").cast("timestamp").cast("long"))

previousAmount = data.withColumn("balance_diff_with_previous_month", F.lag("balance").over(partition))
                     .show(10, False)

【讨论】:

我相信这行得通。我将使用真实数据对其进行更多测试,然后如果它仍然有效(我希望它会这样做),请接受您的答案。非常感谢! 我没有用过很多Window函数,也不是很熟悉,但是为什么我们需要在第一行末尾的.cast(“long”)?还假设日期列将作为日期出现,我们可以省略 cast("timestamp") 吗? 没有必要强制转换,但这只是时间戳的一种形式【参考方案3】:
>>> from pyspark.sql.functions import *
>>> from pyspark.sql import Window
>>> df.show()
+----------+-------+-------+
|      date|account|balance|
+----------+-------+-------+
|01.01.2019|      1|     40|
|01.01.2019|      2|     33|
|01.01.2019|      3|     31|
|01.02.2019|      1|     32|
|01.02.2019|      2|     56|
|01.02.2019|      4|     89|
|01.03.2019|      2|     12|
|01.03.2019|      4|     35|
|01.03.2019|      5|     76|
|01.03.2019|      6|     47|
+----------+-------+-------+

>>> df1 = df.withColumn("date", expr("to_date(date, 'dd.MM.yyyy')"))
>>> W = Window.partitionBy("account").orderBy("date")
>>> df1.withColumn("balance_diff_with_previous_month", col("balance") - lag(col("balance"),1,0).over(W)).show()
+----------+-------+-------+--------------------------------+
|      date|account|balance|balance_diff_with_previous_month|
+----------+-------+-------+--------------------------------+
|2019-01-01|      1|     40|                            40.0|
|2019-01-01|      2|     33|                            33.0|
|2019-01-01|      3|     31|                            31.0|
|2019-02-01|      1|     32|                            -8.0|
|2019-02-01|      2|     56|                            23.0|
|2019-02-01|      4|     89|                            89.0|
|2019-03-01|      2|     12|                           -44.0|
|2019-03-01|      4|     35|                           -54.0|
|2019-03-01|      5|     76|                            76.0|
|2019-03-01|      6|     47|                            47.0|
+----------+-------+-------+--------------------------------+

【讨论】:

以上是关于加入 PySpark 数据集中每个月的上个月数据的主要内容,如果未能解决你的问题,请参考以下文章

从估算日期开始每个唯一用户的前六个月数据 - PostgreSQL

如何在我的数据集中获得每个月的 MAX/MIN 函数?

php+mysql中怎么查询最近12个月每个月的数据

SQL 查询每个月统计的数据。

查询近7天,近1个月,近3个月每天的数据量,查询近一年每个月的数据量

一次性集中处理大量数据的定时任务,如何缩短执行时间?