如何在 Pyspark 2.1 中使用窗口函数来计算星期几的出现次数
Posted
技术标签:
【中文标题】如何在 Pyspark 2.1 中使用窗口函数来计算星期几的出现次数【英文标题】:How to use a window function to count day of week occurrences in Pyspark 2.1 【发布时间】:2018-06-06 21:33:23 【问题描述】:使用以下 pyspark 数据集 (2.1),如何使用窗口函数计算当前记录的星期几在过去 28 天内出现的次数。
示例数据框:
from pyspark.sql import functions as F
df = sqlContext.createDataFrame([
("a", "1", "2018-01-01 12:01:01","Monday"),
("a", "13", "2018-01-01 14:01:01","Monday"),
("a", "22", "2018-01-02 22:01:01","Tuesday"),
("a", "43", "2018-01-08 01:01:01","Monday"),
("a", "43", "2018-01-09 01:01:01","Tuesday"),
("a", "74", "2018-01-10 12:01:01","Wednesday"),
("a", "95", "2018-01-15 06:01:01","Monday"),
], ["person_id", "other_id", "timestamp","dow"])
df.withColumn("dow_count",`some window function`)
可能的窗口
from pyspark.sql import Window
from pyspark.sql import functions as F
Days_28 = (86400 * 28)
window= Window.partitionBy("person_id").orderBy('timestamp').rangeBetween(-Days_30, -1)
## I know this next line is wrong
df.withColumn("dow_count",F.sum(F.when(Current_day=windowed_day,1).otherwise(0)).over(window))
示例输出
df.show()
+---------+--------+-------------------+---------+---------+
|person_id|other_id| timestamp| dow|dow_count|
+---------+--------+-------------------+---------+---------+
| a| 1|2018-01-01 12:01:01| Monday|0 |
| a| 13|2018-01-01 14:01:01| Monday|1 |
| a| 22|2018-01-02 22:01:01| Tuesday|0 |
| a| 43|2018-01-08 01:01:01| Monday|2 |
| a| 43|2018-01-09 01:01:01| Tuesday|1 |
| a| 74|2018-01-10 12:01:01|Wednesday|0 |
| a| 95|2018-01-15 06:01:01| Monday|3 |
+---------+--------+-------------------+---------+---------+
【问题讨论】:
【参考方案1】:使用 F.row_number(),按 (person_id, dow) 划分的窗口和你的rangeBetween()
的逻辑应该替换为where()
:
from datetime import timedelta, datetime
N_days = 28
end = datetime.combine(datetime.today(), datetime.min.time())
start = end - timedelta(days=N_days)
window = Window.partitionBy("person_id", "dow").orderBy('timestamp')
df.where((df.timestamp < end) & (df.timestamp >= start)) \
.withColumn('dow_count', F.row_number().over(window)-1) \
.show()
【讨论】:
这样做的问题是它将每一行中的日期与当前时间戳进行比较,而不是取每一行并倒数 28 天。 但每一行只能保存一个dow_count
,除非您在数据框中再添加 27 列或行。这是你想要的吗?
啊,我明白了,你在为一周中的每一天写一个专栏。是的,每一行只需要 dow_count,特别是它当前所在的星期几。因此,如果当天是星期一,那么我只需要过去 28 天内的星期一的 dow_count。
我现在明白了。 “道”的划分有很大帮助。我最终创建了一个 unix_ts,将其转换为 long 并在窗口上使用计数。【参考方案2】:
我想通了,想分享一下。
首先创建一个 unix 时间戳并将其转换为 long。 然后,按人和星期几划分。 最后,在窗口上使用count函数。
from pyspark.sql import functions as F
df = df.withColumn('unix_ts',df.timestamp.astype('Timestamp').cast("long"))
w = Window.partitionBy('person_id','dow').orderBy('unix_ts').rangeBetween(-86400*15,-1)
df = df.withColumn('occurrences_in_7_days',F.count('unix_ts').over(w))
df.sort(df.unix_ts).show()
奖励:如何根据时间戳创建实际的星期几。
df = df.withColumn("DayOfWeek",F.date_format(df.timestamp, 'EEEE'))
如果没有来自 jxc 和 this*** 文章的提示,我无法做到这一点。
【讨论】:
以上是关于如何在 Pyspark 2.1 中使用窗口函数来计算星期几的出现次数的主要内容,如果未能解决你的问题,请参考以下文章
如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?
如何使用滚动窗口函数计算 Pyspark Dataframe 中等于某个值的相邻值的数量?