SQL/PySpark:创建一个包含过去 n 天的行数的新列

Posted

技术标签:

【中文标题】SQL/PySpark:创建一个包含过去 n 天的行数的新列【英文标题】:SQL/PySpark: Create a new column consisting of a number of rows in the past n days 【发布时间】:2019-02-11 07:19:12 【问题描述】:

目前,我有一个由encounter_iddate 字段组成的表,如下所示:

+---------------------------+--------------------------+
|encounter_id               |date                      |
+---------------------------+--------------------------+
|random_id34234             |2018-09-17 21:53:08.999999|
|this_can_be_anything2432432|2018-09-18 18:37:57.000000|
|423432                     |2018-09-11 21:00:36.000000|
+---------------------------+--------------------------+

encounter_id 是一个随机字符串。

我的目标是创建一个包含过去 30 天内遭遇总数的列。

+---------------------------+--------------------------+---------------------------+
|encounter_id               |date                      | encounters_in_past_30_days|
+---------------------------+--------------------------+---------------------------+
|random_id34234             |2018-09-17 21:53:08.999999| 2                         |
|this_can_be_anything2432432|2018-09-18 18:37:57.000000| 3                         |
|423432                     |2018-09-11 21:00:36.000000| 1                         |
+---------------------------+--------------------------+---------------------------+

目前,我正在考虑以某种方式使用窗口函数并指定一个聚合函数。

感谢您的时间。

【问题讨论】:

能否请您添加您的预期输出? 当然。预期的输出是列 total_encounters_in_the_past_30_days,其中第一个条目将是 2(最后一个条目和第一个条目的出现),第二个条目将是 3(所有条目的出现),第三个条目将仅为 1(本身)。 你的问题太不清楚了,很难做任何事情。聚合的基础应该是什么,换句话说groupby是什么? encounters_in_past_30_days 是如何获得 2/3/1 的?第一个条目2如何?他们有不同的dateencounter_id。分组的依据是什么?这是一个开放式问题。 【参考方案1】:

这是一种可能的解决方案,我添加了一些示例数据。正如您自己建议的那样,它确实使用了窗口函数。希望这会有所帮助!

import pyspark.sql.functions as F
from pyspark.sql.window import Window

df = sqlContext.createDataFrame(
    [
     ('A','2018-10-01 00:15:00'),
     ('B','2018-10-11 00:30:00'),
     ('C','2018-10-21 00:45:00'),
     ('D','2018-11-10 00:00:00'),
     ('E','2018-12-20 00:15:00'),
     ('F','2018-12-30 00:30:00')
    ],
    ("encounter_id","date")
)

df = df.withColumn('timestamp',F.col('date').astype('Timestamp').cast("long"))
w = Window.orderBy('timestamp').rangeBetween(-60*60*24*30,0)
df = df.withColumn('encounters_past_30_days',F.count('encounter_id').over(w))
df.show()

输出:

+------------+-------------------+----------+-----------------------+
|encounter_id|               date| timestamp|encounters_past_30_days|
+------------+-------------------+----------+-----------------------+
|           A|2018-10-01 00:15:00|1538345700|                      1|
|           B|2018-10-11 00:30:00|1539210600|                      2|
|           C|2018-10-21 00:45:00|1540075500|                      3|
|           D|2018-11-10 00:00:00|1541804400|                      2|
|           E|2018-12-20 00:15:00|1545261300|                      1|
|           F|2018-12-30 00:30:00|1546126200|                      2|
+------------+-------------------+----------+-----------------------+

编辑:如果您想以天为粒度,您可以首先将您的日期列转换为 Date 类型。下面的示例,假设五天的窗口意味着今天和之前的四天。如果应该是今天和过去五天,只需删除 -1

import pyspark.sql.functions as F
from pyspark.sql.window import Window

n_days = 5

df = sqlContext.createDataFrame(
    [
     ('A','2018-10-01 23:15:00'),
     ('B','2018-10-02 00:30:00'),
     ('C','2018-10-05 05:45:00'),
     ('D','2018-10-06 00:15:00'),
     ('E','2018-10-07 00:15:00'),
     ('F','2018-10-10 21:30:00')
    ],
    ("encounter_id","date")
)

df = df.withColumn('timestamp',F.to_date(F.col('date')).astype('Timestamp').cast("long"))
w = Window.orderBy('timestamp').rangeBetween(-60*60*24*(n_days-1),0)
df = df.withColumn('encounters_past_n_days',F.count('encounter_id').over(w))
df.show()

输出:

+------------+-------------------+----------+----------------------+
|encounter_id|               date| timestamp|encounters_past_n_days|
+------------+-------------------+----------+----------------------+
|           A|2018-10-01 23:15:00|1538344800|                     1|
|           B|2018-10-02 00:30:00|1538431200|                     2|
|           C|2018-10-05 05:45:00|1538690400|                     3|
|           D|2018-10-06 00:15:00|1538776800|                     3|
|           E|2018-10-07 00:15:00|1538863200|                     3|
|           F|2018-10-10 21:30:00|1539122400|                     3|
+------------+-------------------+----------+----------------------+

【讨论】:

谢谢!如果每天的粒度是按日期而不是按 24 小时的差异,我应该怎么做? @VVNoodle 更新了我的答案,希望这就是你要找的!

以上是关于SQL/PySpark:创建一个包含过去 n 天的行数的新列的主要内容,如果未能解决你的问题,请参考以下文章

在FEEDBLOCK内自动包含过去七天的帖子,用于MailChimp RSS广告系列

Pandas:过去 n 天的平均值

启用夏令时时如何在 T-SQL 中获取一天的开始和结束?

追踪 30 天的总结和案例逻辑

创建一个依赖于当前时间的 mongo 视图

每天更新过去 7 天的记录 [重复]