使用 Spark DataFrames 在特定时间范围内获取唯一计数
Posted
技术标签:
【中文标题】使用 Spark DataFrames 在特定时间范围内获取唯一计数【英文标题】:Getting a Unique Count over a Particular Time Frame with Spark DataFrames 【发布时间】:2017-07-25 17:51:23 【问题描述】:我正在尝试弄清楚我想要完成的工作是否可以在 Spark 中实现。假设我有一个 CSV,如果作为 DataFrame 读入,看起来像这样:
+---------------------+-----------+-------+-------------+
| TimeStamp | Customer | User | Application |
+---------------------+-----------+-------+-------------+
| 2017-01-01 00:00:01 | customer1 | user1 | app1 |
| 2017-01-01 12:00:05 | customer1 | user1 | app1 |
| 2017-01-01 14:00:03 | customer1 | user2 | app2 |
| 2017-01-01 23:50:50 | customer1 | user1 | app1 |
| 2017-01-02 00:00:02 | customer1 | user1 | app1 |
+---------------------+-----------+-------+-------------+
我正在尝试生成一个数据框,其中包含某个客户的唯一用户在过去 24 小时内访问应用程序的次数。所以结果看起来像这样:
+---------------------+-----------+-------+-------------+----------------------+
| TimeStamp | Customer | User | Application | UniqueUserVisitedApp |
+---------------------+-----------+-------+-------------+----------------------+
| 2017-01-01 00:00:01 | customer1 | user1 | app1 | 0 |
| 2017-01-01 12:00:05 | customer1 | user2 | app1 | 1 |
| 2017-01-01 13:00:05 | customer1 | user2 | app1 | 2 |
| 2017-01-01 14:00:03 | customer1 | user1 | app1 | 2 |
| 2017-01-01 23:50:50 | customer1 | user3 | app1 | 2 |
| 2017-01-01 23:50:51 | customer2 | user4 | app2 | 0 |
| 2017-01-02 00:00:02 | customer1 | user1 | app1 | 3 |
+---------------------+-----------+-------+-------------+----------------------+
所以我可以用下面的代码做一个翻转窗口,但这并不是我们想要的。
val data = spark.read.csv('path/to/csv')
val tumblingWindow = data
.groupBy(col("Customer"), col("Application"), window(data.col("TimeStamp"), "24 hours"))
.agg(countDistinct("user")).as("UniqueUsersVisitedApp")
结果是这样的:
+-----------+-------------+-------------------------+-----------------------+
| Customer | Application | Window | UniqueUsersVisitedApp |
+-----------+-------------+-------------------------+-----------------------+
| customer1 | app1 | [2017-01-01 00:00:00... | 2 |
| customer2 | app2 | [2017-01-01 00:00:00... | 1 |
| customer1 | app1 | [2017-01-02 00:00:00... | 1 |
+-----------+-------------+-------------------------+-----------------------+
任何帮助将不胜感激。
【问题讨论】:
你想要什么结果? 中间表/数据框是我的目标@Shankar Koirala 【参考方案1】:如果我正确理解您的问题,请在执行 groupBy
之前应用过滤器:
data = spark.read.csv('path/to/csv')
result = (data
.filter(data['TimeStamp'] > now_minus_24_hours)
.groupBy(["Customer", "Application", "User"])
.count())
请注意,过去 24 小时内未访问过的用户将从 DataFrame 中丢失,而不是计数为零。
编辑
如果您想获取过去 24 小时内相对于每个时间戳的访问次数,您可以执行类似于 my answer here 的操作。基本步骤是:
reduceByKey
获取每个用户/应用程序/客户组合的时间戳列表(与另一个示例相同)。现在每一行都将采用以下形式:
((user, app, customer), list_of_timestamps)
处理每个时间戳列表以生成每个时间戳的“过去 24 小时内的访问次数”列表。数据现在将采用以下形式:
((user, app, customer), [(ts_0, num_visits_24hr_before_ts_0), (ts_1, num_visits_24_hr_before ts_2), ...])
flatMap
每行回到多行,使用类似:
lambda row: [(*row[0], *ts_num_visits) for ts_num_visits in row[1]]
【讨论】:
更多的是在当前正在考虑的行的时间戳之前 24 小时访问应用程序的 unqiue 用户的运行计数。您的解决方案仅过滤发生在过去 24 小时。如果我的问题模棱两可,我深表歉意。 @the_Kid26 在这种情况下,您应该能够调整my answer to your very similar question 以获得您想要的结果。 编辑了我上面的答案,以了解我的意思。【参考方案2】:我已经尝试使用 pyspark 窗口函数,通过为每个日期创建子分区并对其应用计数。不确定它们的效率如何。这是我的代码sn-p,
>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import TimestampType
>>> l = [('2017-01-01 00:00:01','customer1','user1','app1'),('2017-01-01 12:00:05','customer1','user1','app1'),('2017-01-01 14:00:03','customer1','user2','app2'),('2017-01-01 23:50:50','customer1','user1','app1'),('2017-01-02 00:00:02','customer1','user1','app1'),('2017-01-02 12:00:02','customer1','user1','app1'),('2017-01-03 14:00:02','customer1','user1','app1'),('2017-01-02 00:00:02','customer1','user2','app2'),('2017-01-01 16:04:01','customer1','user1','app1'),('2017-01-01 23:59:01','customer1','user1','app1'),('2017-01-01 18:00:01','customer1','user2','app2')]
>>> df = spark.createDataFrame(l,['TimeStamp','Customer','User','Application'])
>>> df = df.withColumn('TimeStamp',df['TimeStamp'].cast('timestamp')).withColumn('Date',F.to_date(F.col('TimeStamp')))
>>> df.show()
+-------------------+---------+-----+-----------+----------+
| TimeStamp| Customer| User|Application| Date|
+-------------------+---------+-----+-----------+----------+
|2017-01-01 00:00:01|customer1|user1| app1|2017-01-01|
|2017-01-01 12:00:05|customer1|user1| app1|2017-01-01|
|2017-01-01 14:00:03|customer1|user2| app2|2017-01-01|
|2017-01-01 23:50:50|customer1|user1| app1|2017-01-01|
|2017-01-02 00:00:02|customer1|user1| app1|2017-01-02|
|2017-01-02 12:00:02|customer1|user1| app1|2017-01-02|
|2017-01-03 14:00:02|customer1|user1| app1|2017-01-03|
|2017-01-02 00:00:02|customer1|user2| app2|2017-01-02|
|2017-01-01 16:04:01|customer1|user1| app1|2017-01-01|
|2017-01-01 23:59:01|customer1|user1| app1|2017-01-01|
|2017-01-01 18:00:01|customer1|user2| app2|2017-01-01|
+-------------------+---------+-----+-----------+----------+
>>> df.printSchema()
root
|-- TimeStamp: timestamp (nullable = true)
|-- Customer: string (nullable = true)
|-- User: string (nullable = true)
|-- Application: string (nullable = true)
|-- Date: date (nullable = true)
>>> w = Window.partitionBy('Customer','User','Application','Date').orderBy('Timestamp')
>>> diff = F.coalesce(F.datediff("TimeStamp", F.lag("TimeStamp", 1).over(w)), F.lit(0))
>>> subpartition = F.count(diff<1).over(w)
>>> df.select("*",(subpartition-1).alias('Count')).drop('Date').orderBy('Customer','User','Application','TimeStamp').show()
+-------------------+---------+-----+-----------+-----+
| TimeStamp| Customer| User|Application|Count|
+-------------------+---------+-----+-----------+-----+
|2017-01-01 00:00:01|customer1|user1| app1| 0|
|2017-01-01 12:00:05|customer1|user1| app1| 1|
|2017-01-01 16:04:01|customer1|user1| app1| 2|
|2017-01-01 23:50:50|customer1|user1| app1| 3|
|2017-01-01 23:59:01|customer1|user1| app1| 4|
|2017-01-02 00:00:02|customer1|user1| app1| 0|
|2017-01-02 12:00:02|customer1|user1| app1| 1|
|2017-01-03 14:00:02|customer1|user1| app1| 0|
|2017-01-01 14:00:03|customer1|user2| app2| 0|
|2017-01-01 18:00:01|customer1|user2| app2| 1|
|2017-01-02 00:00:02|customer1|user2| app2| 0|
+-------------------+---------+-----+-----------+-----+
【讨论】:
您的解决方案是计算特定用户访问应用程序的次数,而不是唯一用户访问应用程序的次数。在第一个条目之后,用户 1 的所有计数都应该是 1,除了时间戳为2017-01-03 14:00:02
哦,对不起,我错过了。会检查的。以上是关于使用 Spark DataFrames 在特定时间范围内获取唯一计数的主要内容,如果未能解决你的问题,请参考以下文章
如何在 spark dataframes/spark sql 中使用模式读取 json
是否可以将 Spark 中的 data.table 与 Spark Dataframes 一起使用?
我们如何在 Spark 中使用 Dataframes(由 structtype 方法创建)合并具有不同列数的 2 个表?
使用 pyspark 在循环中附加 Spark DataFrames 的有效方法