计算 pyspark 中日期范围的 ID

Posted

技术标签:

【中文标题】计算 pyspark 中日期范围的 ID【英文标题】:count id for date range in pyspark 【发布时间】:2021-03-02 20:10:53 【问题描述】:

我有一个 pyspark 数据框,其中包含 parsed_date (dtype: date) 和 id (dtype: bigint) 列,如下所示:

+-------+-----------+
|     id|parsed_date|
+-------+-----------+
|1471783| 2017-12-18|
|1471885| 2017-12-18|
|1472928| 2017-12-19|
|1476917| 2017-12-19|
|1477469| 2017-12-21|
|1478190| 2017-12-21|
|1478570| 2017-12-19|
|1481415| 2017-12-21|
|1472592| 2017-12-20|
|1474023| 2017-12-22|
|1474029| 2017-12-22|
|1474067| 2017-12-24|
+-------+-----------+

我有一个如下所示的函数。目的是传递日期(天)和 t(天数)。在 df1 中,id 计入范围(day-t,day)中,在 df2 中,id 计入范围(day,day+t)中。

def hypo_1(df, day, t):
    df1 = (df.filter(f"parsed_date between 'day' - interval t days and 'day' - interval 1 day")
             .withColumn('count_before', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    df2 = (df.filter(f"parsed_date between 'day' + interval 1 day and 'day' + interval t days")
             .withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    return [df1, df2]

df1, df2 = hypo_1(df, '2017-12-20', 2)
df1.show()
+-------+-----------+------------+
|     id|parsed_date|count_before|
+-------+-----------+------------+
|1471783| 2017-12-18|           2|
|1471885| 2017-12-18|           2|
|1472928| 2017-12-19|           3|
|1476917| 2017-12-19|           3|
|1478570| 2017-12-19|           3|
+-------+-----------+------------+

df2.show()
+-------+-----------+-----------+
|     id|parsed_date|count_after|
+-------+-----------+-----------+
|1481415| 2017-12-21|          3|
|1478190| 2017-12-21|          3|
|1477469| 2017-12-21|          3|
|1474023| 2017-12-22|          2|
|1474029| 2017-12-22|          2|
+-------+-----------+-----------+

我想知道如果范围内缺少日期,如何修复此代码?假设2017-12-22 没有记录?是否有可能立即记录在案的日期?我的意思是如果2017-12-22 不存在并且2017-12-21 之后的下一个日期是2017-12-24 那么有可能以某种方式接受吗?

感谢 mck 帮助创建函数 hypo_1(df, day, t)

【问题讨论】:

【参考方案1】:

我删除了 2017-12-22 行来说明。这个想法是得到一个按日期排序的dense_rank(之前降序,之后升序),并过滤排名

from pyspark.sql import functions as F, Window

def hypo_1(df, day, t):
    df1 = (df.filter(f"parsed_date < 'day'")
             .withColumn('rn', F.dense_rank().over(Window.orderBy(F.desc('parsed_date'))))
             .filter('rn <= 2')
             .drop('rn')
             .withColumn('count_before', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    df2 = (df.filter(f"parsed_date > 'day'")
             .withColumn('rn', F.dense_rank().over(Window.orderBy('parsed_date')))
             .filter('rn <= 2')
             .drop('rn')
             .withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    return [df1, df2]

df1, df2 = hypo_1(df, '2017-12-20', 2)
df1.show()
+-------+-----------+------------+
|     id|parsed_date|count_before|
+-------+-----------+------------+
|1471783| 2017-12-18|           2|
|1471885| 2017-12-18|           2|
|1472928| 2017-12-19|           3|
|1476917| 2017-12-19|           3|
|1478570| 2017-12-19|           3|
+-------+-----------+------------+

df2.show()
+-------+-----------+-----------+
|     id|parsed_date|count_after|
+-------+-----------+-----------+
|1477469| 2017-12-21|          3|
|1481415| 2017-12-21|          3|
|1478190| 2017-12-21|          3|
|1474067| 2017-12-24|          1|
+-------+-----------+-----------+

【讨论】:

代码运行良好。谢谢一声。因为我拥有的数据集有数十亿行,所以这需要很多时间。有没有办法优化速度? 我认为一种可能的方法是在您之前的问题中使用过滤器,但将其设置为 10 天左右,这样您就不需要计算每一行的dense_rank。跨度> 我想问一个后续问题。很抱歉这样的延迟,因为之前我使用的是小型数据集的解决方案,现在我想使用重型数据集。当我尝试 df.show() 时,我的代码卡住了,我看到了这个 org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 。有什么建议吗? 您是否尝试按照我在上面评论中所说的添加过滤器? 我不确定在哪里放置过滤器?你能帮忙吗?

以上是关于计算 pyspark 中日期范围的 ID的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中为镶木地板文件过滤日期时间范围和时区

如果日期在季度范围内,PySpark 添加列

如何将免费日期范围添加到 pyspark df

pyspark 使用动态日期范围读取镶木地板文件分区数据

计算pyspark中两个日期之间的时间

在 pyspark.pandas 中添加/减去日期时间