使用 Spark DataFrames 在特定时间范围内获取唯一计数

Posted

技术标签:

【中文标题】使用 Spark DataFrames 在特定时间范围内获取唯一计数【英文标题】:Getting a Unique Count over a Particular Time Frame with Spark DataFrames 【发布时间】:2017-07-25 17:51:23 【问题描述】:

我正在尝试弄清楚我想要完成的工作是否可以在 Spark 中实现。假设我有一个 CSV,如果作为 DataFrame 读入,看起来像这样:

+---------------------+-----------+-------+-------------+
|      TimeStamp      | Customer  | User  | Application |
+---------------------+-----------+-------+-------------+
| 2017-01-01 00:00:01 | customer1 | user1 | app1        |
| 2017-01-01 12:00:05 | customer1 | user1 | app1        |
| 2017-01-01 14:00:03 | customer1 | user2 | app2        |
| 2017-01-01 23:50:50 | customer1 | user1 | app1        |
| 2017-01-02 00:00:02 | customer1 | user1 | app1        |
+---------------------+-----------+-------+-------------+

我正在尝试生成一个数据框,其中包含某个客户的唯一用户在过去 24 小时内访问应用程序的次数。所以结果看起来像这样:

+---------------------+-----------+-------+-------------+----------------------+
|      TimeStamp      | Customer  | User  | Application | UniqueUserVisitedApp |
+---------------------+-----------+-------+-------------+----------------------+
| 2017-01-01 00:00:01 | customer1 | user1 | app1        |                    0 |
| 2017-01-01 12:00:05 | customer1 | user2 | app1        |                    1 |
| 2017-01-01 13:00:05 | customer1 | user2 | app1        |                    2 |
| 2017-01-01 14:00:03 | customer1 | user1 | app1        |                    2 |
| 2017-01-01 23:50:50 | customer1 | user3 | app1        |                    2 |
| 2017-01-01 23:50:51 | customer2 | user4 | app2        |                    0 |
| 2017-01-02 00:00:02 | customer1 | user1 | app1        |                    3 |
+---------------------+-----------+-------+-------------+----------------------+

所以我可以用下面的代码做一个翻转窗口,但这并不是我们想要的。

val data = spark.read.csv('path/to/csv')

val tumblingWindow = data
    .groupBy(col("Customer"), col("Application"), window(data.col("TimeStamp"), "24 hours"))
    .agg(countDistinct("user")).as("UniqueUsersVisitedApp")

结果是这样的:

+-----------+-------------+-------------------------+-----------------------+
| Customer  | Application |         Window          | UniqueUsersVisitedApp |
+-----------+-------------+-------------------------+-----------------------+
| customer1 | app1        | [2017-01-01 00:00:00... |                     2 |
| customer2 | app2        | [2017-01-01 00:00:00... |                     1 |
| customer1 | app1        | [2017-01-02 00:00:00... |                     1 |
+-----------+-------------+-------------------------+-----------------------+

任何帮助将不胜感激。

【问题讨论】:

你想要什么结果? 中间表/数据框是我的目标@Shankar Koirala 【参考方案1】:

如果我正确理解您的问题,请在执行 groupBy 之前应用过滤器:

data = spark.read.csv('path/to/csv')

result = (data
          .filter(data['TimeStamp'] > now_minus_24_hours)
          .groupBy(["Customer", "Application", "User"])
          .count())

请注意,过去 24 小时内未访问过的用户将从 DataFrame 中丢失,而不是计数为零。

编辑

如果您想获取过去 24 小时内相对于每个时间戳的访问次数,您可以执行类似于 my answer here 的操作。基本步骤是:

    reduceByKey 获取每个用户/应用程序/客户组合的时间戳列表(与另一个示例相同)。现在每一行都将采用以下形式:

    ((user, app, customer), list_of_timestamps)

    处理每个时间戳列表以生成每个时间戳的“过去 24 小时内的访问次数”列表。数据现在将采用以下形式:

    ((user, app, customer), [(ts_0, num_visits_24hr_before_ts_0), (ts_1, num_visits_24_hr_before ts_2), ...])

    flatMap 每行回到多行,使用类似:

    lambda row: [(*row[0], *ts_num_visits) for ts_num_visits in row[1]]

【讨论】:

更多的是在当前正在考虑的行的时间戳之前 24 小时访问应用程序的 unqiue 用户的运行计数。您的解决方案仅过滤发生在过去 24 小时。如果我的问题模棱两可,我深表歉意。 @the_Kid26 在这种情况下,您应该能够调整my answer to your very similar question 以获得您想要的结果。 编辑了我上面的答案,以了解我的意思。【参考方案2】:

我已经尝试使用 pyspark 窗口函数,通过为每个日期创建子分区并对其应用计数。不确定它们的效率如何。这是我的代码sn-p,

>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import TimestampType

>>> l = [('2017-01-01 00:00:01','customer1','user1','app1'),('2017-01-01 12:00:05','customer1','user1','app1'),('2017-01-01 14:00:03','customer1','user2','app2'),('2017-01-01 23:50:50','customer1','user1','app1'),('2017-01-02 00:00:02','customer1','user1','app1'),('2017-01-02 12:00:02','customer1','user1','app1'),('2017-01-03 14:00:02','customer1','user1','app1'),('2017-01-02 00:00:02','customer1','user2','app2'),('2017-01-01 16:04:01','customer1','user1','app1'),('2017-01-01 23:59:01','customer1','user1','app1'),('2017-01-01 18:00:01','customer1','user2','app2')]
>>> df = spark.createDataFrame(l,['TimeStamp','Customer','User','Application'])
>>> df = df.withColumn('TimeStamp',df['TimeStamp'].cast('timestamp')).withColumn('Date',F.to_date(F.col('TimeStamp')))
>>> df.show()
+-------------------+---------+-----+-----------+----------+
|          TimeStamp| Customer| User|Application|      Date|
+-------------------+---------+-----+-----------+----------+
|2017-01-01 00:00:01|customer1|user1|       app1|2017-01-01|
|2017-01-01 12:00:05|customer1|user1|       app1|2017-01-01|
|2017-01-01 14:00:03|customer1|user2|       app2|2017-01-01|
|2017-01-01 23:50:50|customer1|user1|       app1|2017-01-01|
|2017-01-02 00:00:02|customer1|user1|       app1|2017-01-02|
|2017-01-02 12:00:02|customer1|user1|       app1|2017-01-02|
|2017-01-03 14:00:02|customer1|user1|       app1|2017-01-03|
|2017-01-02 00:00:02|customer1|user2|       app2|2017-01-02|
|2017-01-01 16:04:01|customer1|user1|       app1|2017-01-01|
|2017-01-01 23:59:01|customer1|user1|       app1|2017-01-01|
|2017-01-01 18:00:01|customer1|user2|       app2|2017-01-01|
+-------------------+---------+-----+-----------+----------+

>>> df.printSchema()
root
 |-- TimeStamp: timestamp (nullable = true)
 |-- Customer: string (nullable = true)
 |-- User: string (nullable = true)
 |-- Application: string (nullable = true)
 |-- Date: date (nullable = true)

>>> w = Window.partitionBy('Customer','User','Application','Date').orderBy('Timestamp')
>>> diff = F.coalesce(F.datediff("TimeStamp", F.lag("TimeStamp", 1).over(w)), F.lit(0))
>>> subpartition = F.count(diff<1).over(w)
>>> df.select("*",(subpartition-1).alias('Count')).drop('Date').orderBy('Customer','User','Application','TimeStamp').show()
+-------------------+---------+-----+-----------+-----+
|          TimeStamp| Customer| User|Application|Count|
+-------------------+---------+-----+-----------+-----+
|2017-01-01 00:00:01|customer1|user1|       app1|    0|
|2017-01-01 12:00:05|customer1|user1|       app1|    1|
|2017-01-01 16:04:01|customer1|user1|       app1|    2|
|2017-01-01 23:50:50|customer1|user1|       app1|    3|
|2017-01-01 23:59:01|customer1|user1|       app1|    4|
|2017-01-02 00:00:02|customer1|user1|       app1|    0|
|2017-01-02 12:00:02|customer1|user1|       app1|    1|
|2017-01-03 14:00:02|customer1|user1|       app1|    0|
|2017-01-01 14:00:03|customer1|user2|       app2|    0|
|2017-01-01 18:00:01|customer1|user2|       app2|    1|
|2017-01-02 00:00:02|customer1|user2|       app2|    0|
+-------------------+---------+-----+-----------+-----+

【讨论】:

您的解决方案是计算特定用户访问应用程序的次数,而不是唯一用户访问应用程序的次数。在第一个条目之后,用户 1 的所有计数都应该是 1,除了时间戳为 2017-01-03 14:00:02 哦,对不起,我错过了。会检查的。

以上是关于使用 Spark DataFrames 在特定时间范围内获取唯一计数的主要内容,如果未能解决你的问题,请参考以下文章

如何在 spark dataframes/spark sql 中使用模式读取 json

是否可以将 Spark 中的 data.table 与 Spark Dataframes 一起使用?

我们如何在 Spark 中使用 Dataframes(由 structtype 方法创建)合并具有不同列数的 2 个表?

使用 pyspark 在循环中附加 Spark DataFrames 的有效方法

使用 Dataframes 的 Spark Overlap 算法

SPARK SQL - 使用 DataFrames 和 JDBC 更新 MySql 表