计算两个连续日期之间的唯一 ID,它们是 PySpark 中列的值

Posted

技术标签:

【中文标题】计算两个连续日期之间的唯一 ID,它们是 PySpark 中列的值【英文标题】:Count unique ids between two consecutive dates that are values of a column in PySpark 【发布时间】:2021-12-09 10:03:12 【问题描述】:

我有一个 PySpark DF,带有 ID 和日期列,如下所示。

ID Date
1 2021-10-01
2 2021-10-01
1 2021-10-02
3 2021-10-02

我想计算前一天不存在的唯一 ID 的数量。因此,这里的结果将是 1,因为 2021-10-02 只有一个新的唯一 ID。

ID Date Count
1 2021-10-01 -
2 2021-10-01 -
1 2021-10-02 1
3 2021-10-02 1

我尝试遵循this 解决方案,但它不适用于日期类型值。任何帮助将不胜感激。 谢谢!

【问题讨论】:

您的示例在某种程度上不一致,因为在 2021 年 10 月 1 日,ID 1 和 2 都是“新的”。 【参考方案1】:

如果您想避免自联接(例如出于性能原因),您可以使用窗口函数:

from pyspark.sql import Row, Window
import datetime

df = spark.createDataFrame([
    Row(ID=1, date=datetime.date(2021,10,1)),
    Row(ID=2, date=datetime.date(2021,10,1)),
    Row(ID=1, date=datetime.date(2021,10,2)),
    Row(ID=2, date=datetime.date(2021,10,2)),
    Row(ID=1, date=datetime.date(2021,10,3)),
    Row(ID=3, date=datetime.date(2021,10,3)),
])

首先添加自上次看到 ID 以来的天数(如果之前从未出现过,则为 None

df = df.withColumn('days_since_last_occurrence', F.datediff('date', F.lag('date').over(Window.partitionBy('ID').orderBy('date'))))

其次,我们添加一列标记此天数不是 1 的行。我们在此列中添加一个 1,以便我们稍后可以对该列求和以计算行数

df = df.withColumn('is_new', F.when(F.col('days_since_last_occurrence') == 1, None).otherwise(1))

现在我们对具有相同日期的所有行求和,然后删除我们不再需要的列:

(
    df
    .withColumn('count', F.sum('is_new').over(Window.partitionBy('date'))) # sum over all rows with the same date
    .drop('is_new', 'days_since_last_occurrence')
    .sort('date', 'ID')
    .show()
)
# Output:
+---+----------+-----+
| ID|      date|count|
+---+----------+-----+
|  1|2021-10-01|    2|
|  2|2021-10-01|    2|
|  1|2021-10-02| null|
|  2|2021-10-02| null|
|  1|2021-10-03|    1|
|  3|2021-10-03|    1|
+---+----------+-----+

【讨论】:

我更改了答案以遵循 OP 链接的帖子的内容 您好,非常感谢。有用!您能否将 cmets 添加到每行正在执行的操作中。 :) @RishabhSahrawat:我扩展了我的答案,希望代码现在更清晰:)【参考方案2】:

取出当天和前一天的id列表,然后获取两者的差值大小,得到最终结果。

更新解决方案以消除join

df = df.select('date', F.expr('collect_set(id) over (partition by date) as id_arr')).dropDuplicates() \
    .select('*', F.expr('size(array_except(id_arr, lag(id_arr,1,id_arr) over (order by date))) as count')) \
    .select(F.explode('id_arr').alias('id'), 'date', 'count')
df.show(truncate=False)

【讨论】:

我想,这可行,但自联接在 Spark 中通常会出现问题,最好避免。 感谢您的回答。 :)

以上是关于计算两个连续日期之间的唯一 ID,它们是 PySpark 中列的值的主要内容,如果未能解决你的问题,请参考以下文章

计算 Hive 数组中连续日期之间的差异

Java,计算两个日期之间的天数[重复]

如何计算两个日期之间的秒数?

js计算两个日期间的所有日期

如何计算两个给定日期之间的天数

如何用EXCEL计算两个日期之间相差的年数和月数?