使用 pySpark 计算用户事件之间的平均时间

Posted

技术标签:

【中文标题】使用 pySpark 计算用户事件之间的平均时间【英文标题】:Calculating the average time between events by users with pySpark 【发布时间】:2016-07-14 15:55:13 【问题描述】:

我有一个由“Events”、“Time”、“UserId”组成的日志文件。

+------------+----------------+---------+
|   Events   |      Time      | UserId  |
+------------+----------------+---------+
| ClickA     | 7/6/16 10:00am | userA   |
+------------+----------------+---------+
| ClickB     | 7/6/16 12:00am | userA   |
+------------+----------------+---------+

我想为每个用户计算事件之间的平均时间。你们如何解决这个问题? 在传统的编程环境中,我会检查用户的每个事件并计算事件 nn-1 之间的时间增量,并将此值添加到数组 A。然后将计算 A 中每个值的平均值。 我怎样才能用 Spark 做到这一点?

【问题讨论】:

【参考方案1】:

忽略日期解析,它看起来像是一个窗口函数的工作,然后是一个简单的聚合,所以大致你需要这样的东西:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lag, avg

val df = Seq(
  ("ClickA", "2016-06-07 10:00:00", "UserA"),
  ("ClickB", "2016-06-07 12:00:00", "UserA")
).toDF("events", "time", "userid").withColumn("time", $"time".cast("timestamp"))

val w = Window.partitionBy("userid").orderBy("time")

// Difference between consecutive events in seconds
val diff = $"time".cast("long") - lag($"time", 1).over(w).cast("long")

df.withColumn("diff", diff).groupBy("userid").agg(avg($"diff"))

【讨论】:

谢谢 zero323!你知道我如何将这个字符串 (5/1/2016 4:03:34 PM) 转换为时间戳吗?我找不到 pyspark 的正确方法。 与此处显示的差不多:***.com/a/36095322/1560062 但您必须调整格式 (docs.oracle.com/javase/7/docs/api/java/text/…)

以上是关于使用 pySpark 计算用户事件之间的平均时间的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:计算指数移动平均线

PySpark:一步计算平均值、标准差和平均值附近的值

Big Query Compute 两个自定义事件之间的平均时间

RDD的Pyspark平均间隔

如何有效地计算pyspark中的平均值和标准差

pyspark数据框中两列之间的时间差