如何使用pyspark查找时间范围内每分钟发生的事件

Posted

技术标签:

【中文标题】如何使用pyspark查找时间范围内每分钟发生的事件【英文标题】:How to find occurence of an event per minute within a time range using pyspark 【发布时间】:2018-04-30 00:17:59 【问题描述】:
| tweet id |  | tweet created minute |  | Game start minute |  | Game end minute |         
1001      145678                145600             145730   
1002      145678                145600             145730   
1005      145680                145600             145730   
12278     145687                145600             145730     
765558    145688                145600             145730     
724323    145689                145600             145730     
875857    145688                145600             145730     
79375     145685                145600             145730     
84666     145686                145600             145730     
335556    145687                145600             145730     
29990     145688                145600             145730     
56        145689                145600             145730 
968867    145690                145600             145730     
8452      145691                145600             145730   
1334      145679                145600             145730  

本场比赛有 130 分钟。如何计算每分钟的推文数量? “tweet id”代表一条独特的推文。

预期结果格式:

minutes count of tweets
1 2
2 1
3 2
4 3
5 1
6 0
7 0
8 2
9 1
10 0

【问题讨论】:

【参考方案1】:

假设 tweet id 是唯一的并使用 Pyspark 和 raw rdd:

rdd = sc.parallelize([(1001 ,145678, 145600, 145730),
(1002 ,145678, 145600, 145730),
(1005 ,145680, 145600, 145730), 
(12278 ,145687, 145600, 145730), 
(765558 ,145688, 145600, 145730), 
(724323 ,145689, 145600, 145730), 
(875857 ,145688, 145600, 145730), 
(79375 ,145685, 145600, 145730), 
(84666 ,145686, 145600, 145730), 
(335556 ,145687, 145600, 145730), 
(29990 ,145688, 145600, 145730), 
(56 ,145689, 145600, 145730), 
(968867 ,145690, 145600, 145730), 
(8452 ,145691, 145600, 145730), 
(1334 ,145679, 145600, 145730) ])

result_dict = rdd.filter(lambda x: x[2] <= x[1] <= x[3]).map(lambda x: (x[1] - x[2], 0))\
.countByKey()

print "minutes count of tweets"
for i in sorted(result_dict.iteritems()):
    print "0\t1".format(i[0], i[1])

结果:

minutes count of tweets
78  2
79  1
80  1
85  1
86  1
87  2
88  3
89  2
90  1
91  1

【讨论】:

以上是关于如何使用pyspark查找时间范围内每分钟发生的事件的主要内容,如果未能解决你的问题,请参考以下文章

使用oracle,如何做到某个时间段内每间隔1分钟查询出一条记录?

在信标区域内每 10 分钟更新一次应用程序

pyspark如何在窗口内聚合

如何根据时间删除或查找文件

如何在服务器上的特定小时内每分钟运行一次cron作业

根据当地时间 (HH:MM) 计算 24 小时周期内每分钟的平均销售额