计算两个连续日期之间的唯一 ID,它们是 PySpark 中列的值
Posted
技术标签:
【中文标题】计算两个连续日期之间的唯一 ID,它们是 PySpark 中列的值【英文标题】:Count unique ids between two consecutive dates that are values of a column in PySpark 【发布时间】:2021-12-09 10:03:12 【问题描述】:我有一个 PySpark DF,带有 ID 和日期列,如下所示。
ID | Date |
---|---|
1 | 2021-10-01 |
2 | 2021-10-01 |
1 | 2021-10-02 |
3 | 2021-10-02 |
我想计算前一天不存在的唯一 ID 的数量。因此,这里的结果将是 1,因为 2021-10-02 只有一个新的唯一 ID。
ID | Date | Count |
---|---|---|
1 | 2021-10-01 | - |
2 | 2021-10-01 | - |
1 | 2021-10-02 | 1 |
3 | 2021-10-02 | 1 |
我尝试遵循this 解决方案,但它不适用于日期类型值。任何帮助将不胜感激。 谢谢!
【问题讨论】:
您的示例在某种程度上不一致,因为在 2021 年 10 月 1 日,ID 1 和 2 都是“新的”。 【参考方案1】:如果您想避免自联接(例如出于性能原因),您可以使用窗口函数:
from pyspark.sql import Row, Window
import datetime
df = spark.createDataFrame([
Row(ID=1, date=datetime.date(2021,10,1)),
Row(ID=2, date=datetime.date(2021,10,1)),
Row(ID=1, date=datetime.date(2021,10,2)),
Row(ID=2, date=datetime.date(2021,10,2)),
Row(ID=1, date=datetime.date(2021,10,3)),
Row(ID=3, date=datetime.date(2021,10,3)),
])
首先添加自上次看到 ID 以来的天数(如果之前从未出现过,则为 None
)
df = df.withColumn('days_since_last_occurrence', F.datediff('date', F.lag('date').over(Window.partitionBy('ID').orderBy('date'))))
其次,我们添加一列标记此天数不是 1 的行。我们在此列中添加一个 1,以便我们稍后可以对该列求和以计算行数
df = df.withColumn('is_new', F.when(F.col('days_since_last_occurrence') == 1, None).otherwise(1))
现在我们对具有相同日期的所有行求和,然后删除我们不再需要的列:
(
df
.withColumn('count', F.sum('is_new').over(Window.partitionBy('date'))) # sum over all rows with the same date
.drop('is_new', 'days_since_last_occurrence')
.sort('date', 'ID')
.show()
)
# Output:
+---+----------+-----+
| ID| date|count|
+---+----------+-----+
| 1|2021-10-01| 2|
| 2|2021-10-01| 2|
| 1|2021-10-02| null|
| 2|2021-10-02| null|
| 1|2021-10-03| 1|
| 3|2021-10-03| 1|
+---+----------+-----+
【讨论】:
我更改了答案以遵循 OP 链接的帖子的内容 您好,非常感谢。有用!您能否将 cmets 添加到每行正在执行的操作中。 :) @RishabhSahrawat:我扩展了我的答案,希望代码现在更清晰:)【参考方案2】:取出当天和前一天的id列表,然后获取两者的差值大小,得到最终结果。
更新解决方案以消除join
。
df = df.select('date', F.expr('collect_set(id) over (partition by date) as id_arr')).dropDuplicates() \
.select('*', F.expr('size(array_except(id_arr, lag(id_arr,1,id_arr) over (order by date))) as count')) \
.select(F.explode('id_arr').alias('id'), 'date', 'count')
df.show(truncate=False)
【讨论】:
我想,这可行,但自联接在 Spark 中通常会出现问题,最好避免。 感谢您的回答。 :)以上是关于计算两个连续日期之间的唯一 ID,它们是 PySpark 中列的值的主要内容,如果未能解决你的问题,请参考以下文章