在pyspark中以一秒为粒度的时间范围内计算与一种特定类型的时间的每个时间差的类型

Posted

技术标签:

【中文标题】在pyspark中以一秒为粒度的时间范围内计算与一种特定类型的时间的每个时间差的类型【英文标题】:Count types for every time difference from the time of one specific type within a time range with a granularity of one second in pyspark 【发布时间】:2019-11-23 09:59:09 【问题描述】:

我在DataFrame 的pyspark 中有以下时间序列数据:

(id, timestamp, type)

id 列可以是任意整数值和相同 id 的多行 可以存在于表中

timestamp 列是一个用整数表示的时间戳(为了简化)

type 列是一个字符串类型变量,其中每个不同的 列上的字符串代表一个类别。一类特殊 最重要的是'A'

我的问题如下:

有什么方法可以计算(使用 SQL 或 pyspark DataFrame 操作):

每种类型的计数 对于与所有行对应的时间戳的所有时间差 type='A' 在时间范围内(例如 [-5,+5]),粒度为 1 秒

例如,对于以下DataFrame

ts_df = sc.parallelize([
    (1,'A',100),(2,'A',1000),(3,'A',10000),

    (1,'b',99),(1,'b',99),(1,'b',99),
    (2,'b',999),(2,'b',999),(2,'c',999),(2,'c',999),(1,'d',999),
    (3,'c',9999),(3,'c',9999),(3,'d',9999),

    (1,'b',98),(1,'b',98),
    (2,'b',998),(2,'c',998),
    (3,'c',9998)
]).toDF(["id","type","ts"])
ts_df.show()
+---+----+-----+
| id|type|   ts|
+---+----+-----+
|  1|   A|  100|
|  2|   A| 1000|
|  3|   A|10000|
|  1|   b|   99|
|  1|   b|   99|
|  1|   b|   99|
|  2|   b|  999|
|  2|   b|  999|
|  2|   c|  999|
|  2|   c|  999|
|  1|   d|  999|
|  3|   c| 9999|
|  3|   c| 9999|
|  3|   d| 9999|
|  1|   b|   98|
|  1|   b|   98|
|  2|   b|  998|
|  2|   c|  998|
|  3|   c| 9998|
+---+----+-----+

对于 -1 秒的时间差,结果应该是:

# result for time difference = -1 sec
# b: 5
# c: 4
# d: 2

而对于 -2 秒的时间差,结果应该是:

# result for time difference = -2 sec
# b: 3
# c: 2
# d: 0

以此类推,时间范围内的任何时间差,粒度为 1 秒。

我尝试了许多不同的方法,主要使用groupBy,但似乎没有任何效果。

我在如何表达 type=A 每一行的时间差时遇到困难,即使我必须针对一个特定的时间差来表达。

任何建议将不胜感激!

编辑:

如果我只需要在一个特定的时差time_difference 上执行此操作,那么我可以通过以下方式执行此操作:

time_difference = -1
df_type_A = ts_df.where(F.col("type")=='A').selectExpr("ts as fts")
res = df_type_A.join(ts_df, on=df_type_A.fts+time_difference==ts_df.ts)\
.drop("ts","fts").groupBy(F.col("type")).count()

返回的res DataFrame 将为我提供一个特定时差的确切信息。我创建了一个循环并通过一遍又一遍地重复相同的查询来解决问题。

但是,还有比这更有效的方法吗?

EDIT2(解决方案) 所以我最后就是这样做的:

df1 = sc.parallelize([
    (1,'b',99),(1,'b',99),(1,'b',99),
    (2,'b',999),(2,'b',999),(2,'c',999),(2,'c',999),(2,'d',999),
    (3,'c',9999),(3,'c',9999),(3,'d',9999),

    (1,'b',98),(1,'b',98),
    (2,'b',998),(2,'c',998),
    (3,'c',9998)
]).toDF(["id","type","ts"])
df1.show()

df2 = sc.parallelize([
    (1,'A',100),(2,'A',1000),(3,'A',10000),
]).toDF(["id","type","ts"]).selectExpr("id as fid","ts as fts","type as ftype")
df2.show()

df3 = df2.join(df1, on=df1.id==df2.fid).withColumn("td", F.col("ts")-F.col("fts"))
df3.show()

df4 = df3.groupBy([F.col("type"),F.col("td")]).count()
df4.show()

我会尽快更新性能详情。

谢谢!

【问题讨论】:

【参考方案1】:

解决这个问题的另一种方法是:

将现有数据帧分成两个数据帧 - 有 A 和没有 A 在没有 A df 的情况下添加一个新列,即 "ts" 和 time_difference 之和 加入数据框、分组依据和计数。

这是一个代码:

from pyspark.sql.functions import lit
time_difference = 1
ts_df_A = (
    ts_df
    .filter(ts_df["type"] == "A")
    .drop("id")
    .drop("type")
)

ts_df_td = (
    ts_df
    .withColumn("ts_plus_td", lit(ts_df['ts'] + time_difference))
    .filter(ts_df["type"] != "A")
    .drop("ts")
)

joined_df = ts_df_A.join(ts_df_td, ts_df_A["ts"] == ts_df_td["ts_plus_td"])
agg_df = joined_df.groupBy("type").count()

>>> agg_df.show()
+----+-----+
|type|count|
+----+-----+
|   d|    2|
|   c|    4|
|   b|    5|
+----+-----+

>>>

如果这是您要找的,请告诉我?

谢谢, 侯赛因·博赫拉

【讨论】:

谢谢侯赛因,太好了!它实际上似乎等同于我在上面的 EDIT 中的内容。但是,(受您的解决方案的启发)我想知道是否可以构建一个列,我们可以将任何类型的每一行的时间差存储到具有相同 ID 的 type='A' 的相应行。如果我们可以有这样一个列,那么就可以将它添加到最后的 group by 并有效地获得任何时间差的结果。这有意义吗? 我在想更像这样的东西:``` df1 = # df without type=='A' # df2 是只有 type=='A' 的数据框 df2 = sc.parallelize([ (1,'A',100),(2,'A',1000),(3,'A',10000), ]).toDF(["id","type","ts"])。 selectExpr("id as fid","ts as fts","type as ftype") df2.show() 和具有时间差的新列的数据帧 df2.join(df1, on=df1.id==df2.fid ).withColumn("td", F.col("ts")-F.col("fts")).show() ``` 然后我们就可以按时间差加在group上。 我也为此编写了类似类型的代码:ts_df_A = ( ts_df .filter(ts_df["type"] == "A") .select( col("id").alias("Aid"), col("type").alias("Atype"), col("ts").alias("Ats")) ) ts_df_td = ( ts_df .withColumn("ts_plus_td", lit(ts_df['ts'] + time_difference)) .filter(ts_df["type"] != "A") ) joined_df = ( ts_df_A .join(ts_df_td, ts_df_A["Aid"] == ts_df_td["id"]) .select("id", "Ats", "ts", "type", lit(ts_df_A["Ats"] - ts_df_td["ts"])) ) 为了证明此代码比上一个代码更高效,我们需要在大量数据上进行测试。 我同意,非常感谢您的帮助!我将用一个具体的解决方案更新帖子,并在我对大量数据进行测试时提及性能!干杯!

以上是关于在pyspark中以一秒为粒度的时间范围内计算与一种特定类型的时间的每个时间差的类型的主要内容,如果未能解决你的问题,请参考以下文章

如何计算最后一秒、一分钟和一小时内的请求数?

如何在一个sql中以每周和每月的粒度计算数据

使用 JOOQ 在 PostgreSQL 中以秒为单位查找和求和时间戳之间的差异

如何在jquery中以秒为单位获取两个日期之间的差异?

如何在 Swift3 中以秒为单位将字符串更改为分钟?

如果日期在季度范围内,PySpark 添加列