计算 Pyspark 中发生条件时两个事件之间的月数

Posted

技术标签:

【中文标题】计算 Pyspark 中发生条件时两个事件之间的月数【英文标题】:Count the number of months between two events when a condition occurs in Pyspark 【发布时间】:2021-03-23 13:32:11 【问题描述】:

我在 Pyspark 工作,我需要计算满足条件时两个事件之间的月数。

接下来我展示一下我的桌子是怎样的,以便您更好地了解我。这是我的初始数据框。

from pyspark.sql import Row, Window
from pyspark.sql.functions import *
from datetime import datetime, date

row = Row("id", "start", "condition")
df = sc.parallelize([
    row(1, "2015-01-31", 0),
    row(1, "2015-02-28", 0),
    row(1, "2015-03-31", 0),
    row(1, "2015-04-30", 0),
    row(1, "2015-05-31", 1),
    row(1, "2015-06-30", 1)
]).toDF().withColumn("start", col("start").cast("date"))

## +---+----------+----------+
## | id|     start| condition| 
## +---+----------+----------+
## |  1|2015-01-31|         0|
## |  1|2015-02-28|         0|
## |  1|2015-03-31|         0|
## |  1|2015-04-30|         0|
## |  1|2015-05-31|         1|
## |  1|2015-06-30|         1|
## +---+----------+----------+

我想要这个结果:

## +---+----------+----------+------------------+       
## | id|     start| condition| Months_between|
## +---+----------+----------+---------------+
## |  1|2015-01-31|         0|              4|
## |  1|2015-02-28|         0|              3|
## |  1|2015-03-31|         0|              2|
## |  1|2015-04-30|         0|              1|
## |  1|2015-05-31|         1|              0|
## |  1|2015-06-30|         1|              0|
## +---+----------+----------+---------------+

我想知道条件从 0 变为 1 的一行和另一行之间经过了多少个月。如果条件从未变为 1,则它应该为 0。 样本有一个 id,但每个日期都有很多 id。

我想过做一个窗口,但我不知道如何获得月数。我曾想过这样的事情:

max_days = (df.select(max("start")).collect()[0][0] - df.select(min("start")).collect()[0][0]).days
days = lambda i: i * 86400
window = Window.partitionBy("id").orderBy(col("start").cast("long")).rangeBetween(days(max_days), 0)

谢谢!它有效:

df2 = df.withColumn(
    'Months_between', 
    F.when(
        F.col('condition') == 0, 
        F.months_between(
            F.min(
                F.when(F.col('condition') == 1, F.col('start'))
            ).over(Window.partitionBy('id')), 
            F.col('start')
        ).cast('int')
    ).otherwise(0)
)

但是当我有这个例子时我发现了一个问题。当值在 0 和 1 之间多次变化时。

| id|     start|condition|Months_between|
+---+----------+---------+------------------+
|  1|2015-01-31|        0|              2|
|  1|2015-02-28|        0|              1|
|  1|2015-03-31|        1|              0|
|  1|2015-04-30|        1|              0|
|  1|2015-05-31|        0|             -1|
|  1|2015-06-30|        1|              0|
+---+----------+---------+-----------------+

在 2015-05-31 日期,它应该取值 1,但在搜索最小值时,它会取值 -1。有什么建议吗?谢谢!

感谢您的帮助!

【问题讨论】:

【参考方案1】:

您可以找到每个 id 的 condition = 1 的最早日期,并使用 months_between 获取该日期与当前行之间的月份:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'next_start',
    F.first(
        F.when(F.col('condition') == 1, F.col('start')),
        ignorenulls=True
    ).over(
        Window.partitionBy('id')
              .orderBy('start')
              .rowsBetween(0, Window.unboundedFollowing)
    )
).withColumn(
    'Months_between', 
    F.when(
        F.col('condition') == 0, 
        F.months_between(
            F.col('next_start'), 
            F.col('start')
        ).cast('int')
    ).otherwise(0)
).drop('next_start')

df2.show() 
+---+----------+---------+--------------+
| id|     start|condition|Months_between|
+---+----------+---------+--------------+
|  1|2015-01-31|        0|             2|
|  1|2015-02-28|        0|             1|
|  1|2015-03-31|        1|             0|
|  1|2015-04-30|        1|             0|
|  1|2015-05-31|        0|             1|
|  1|2015-06-30|        1|             0|
+---+----------+---------+--------------+

【讨论】:

以上是关于计算 Pyspark 中发生条件时两个事件之间的月数的主要内容,如果未能解决你的问题,请参考以下文章

计算SQL中两个日期之间的月数[重复]

计算两个日期之间的月数

将两个给定日期之间的月、年和日计算为时间戳[重复]

使用复杂的条件逻辑加入 Pyspark 数据帧(可能使用地图代替)

如何计算定义自定义财务日历的两个日期之间的月数?

MySQL中日期之间的月差