SQL/PySpark:创建一个包含过去 n 天的行数的新列
Posted
技术标签:
【中文标题】SQL/PySpark:创建一个包含过去 n 天的行数的新列【英文标题】:SQL/PySpark: Create a new column consisting of a number of rows in the past n days 【发布时间】:2019-02-11 07:19:12 【问题描述】:目前,我有一个由encounter_id
和date
字段组成的表,如下所示:
+---------------------------+--------------------------+
|encounter_id |date |
+---------------------------+--------------------------+
|random_id34234 |2018-09-17 21:53:08.999999|
|this_can_be_anything2432432|2018-09-18 18:37:57.000000|
|423432 |2018-09-11 21:00:36.000000|
+---------------------------+--------------------------+
encounter_id
是一个随机字符串。
我的目标是创建一个包含过去 30 天内遭遇总数的列。
+---------------------------+--------------------------+---------------------------+
|encounter_id |date | encounters_in_past_30_days|
+---------------------------+--------------------------+---------------------------+
|random_id34234 |2018-09-17 21:53:08.999999| 2 |
|this_can_be_anything2432432|2018-09-18 18:37:57.000000| 3 |
|423432 |2018-09-11 21:00:36.000000| 1 |
+---------------------------+--------------------------+---------------------------+
目前,我正在考虑以某种方式使用窗口函数并指定一个聚合函数。
感谢您的时间。
【问题讨论】:
能否请您添加您的预期输出? 当然。预期的输出是列 total_encounters_in_the_past_30_days,其中第一个条目将是 2(最后一个条目和第一个条目的出现),第二个条目将是 3(所有条目的出现),第三个条目将仅为 1(本身)。 你的问题太不清楚了,很难做任何事情。聚合的基础应该是什么,换句话说groupby
是什么? encounters_in_past_30_days
是如何获得 2/3/1
的?第一个条目2如何?他们有不同的date
和encounter_id
。分组的依据是什么?这是一个开放式问题。
【参考方案1】:
这是一种可能的解决方案,我添加了一些示例数据。正如您自己建议的那样,它确实使用了窗口函数。希望这会有所帮助!
import pyspark.sql.functions as F
from pyspark.sql.window import Window
df = sqlContext.createDataFrame(
[
('A','2018-10-01 00:15:00'),
('B','2018-10-11 00:30:00'),
('C','2018-10-21 00:45:00'),
('D','2018-11-10 00:00:00'),
('E','2018-12-20 00:15:00'),
('F','2018-12-30 00:30:00')
],
("encounter_id","date")
)
df = df.withColumn('timestamp',F.col('date').astype('Timestamp').cast("long"))
w = Window.orderBy('timestamp').rangeBetween(-60*60*24*30,0)
df = df.withColumn('encounters_past_30_days',F.count('encounter_id').over(w))
df.show()
输出:
+------------+-------------------+----------+-----------------------+
|encounter_id| date| timestamp|encounters_past_30_days|
+------------+-------------------+----------+-----------------------+
| A|2018-10-01 00:15:00|1538345700| 1|
| B|2018-10-11 00:30:00|1539210600| 2|
| C|2018-10-21 00:45:00|1540075500| 3|
| D|2018-11-10 00:00:00|1541804400| 2|
| E|2018-12-20 00:15:00|1545261300| 1|
| F|2018-12-30 00:30:00|1546126200| 2|
+------------+-------------------+----------+-----------------------+
编辑:如果您想以天为粒度,您可以首先将您的日期列转换为
Date
类型。下面的示例,假设五天的窗口意味着今天和之前的四天。如果应该是今天和过去五天,只需删除-1
。
import pyspark.sql.functions as F
from pyspark.sql.window import Window
n_days = 5
df = sqlContext.createDataFrame(
[
('A','2018-10-01 23:15:00'),
('B','2018-10-02 00:30:00'),
('C','2018-10-05 05:45:00'),
('D','2018-10-06 00:15:00'),
('E','2018-10-07 00:15:00'),
('F','2018-10-10 21:30:00')
],
("encounter_id","date")
)
df = df.withColumn('timestamp',F.to_date(F.col('date')).astype('Timestamp').cast("long"))
w = Window.orderBy('timestamp').rangeBetween(-60*60*24*(n_days-1),0)
df = df.withColumn('encounters_past_n_days',F.count('encounter_id').over(w))
df.show()
输出:
+------------+-------------------+----------+----------------------+
|encounter_id| date| timestamp|encounters_past_n_days|
+------------+-------------------+----------+----------------------+
| A|2018-10-01 23:15:00|1538344800| 1|
| B|2018-10-02 00:30:00|1538431200| 2|
| C|2018-10-05 05:45:00|1538690400| 3|
| D|2018-10-06 00:15:00|1538776800| 3|
| E|2018-10-07 00:15:00|1538863200| 3|
| F|2018-10-10 21:30:00|1539122400| 3|
+------------+-------------------+----------+----------------------+
【讨论】:
谢谢!如果每天的粒度是按日期而不是按 24 小时的差异,我应该怎么做? @VVNoodle 更新了我的答案,希望这就是你要找的!以上是关于SQL/PySpark:创建一个包含过去 n 天的行数的新列的主要内容,如果未能解决你的问题,请参考以下文章