在 pyspark 数据框中查找不重叠的窗口

Posted

技术标签:

【中文标题】在 pyspark 数据框中查找不重叠的窗口【英文标题】:finding non-overlapping windows in a pyspark dataframe 【发布时间】:2019-07-18 20:30:26 【问题描述】:

假设我有一个 pyspark 数据框,其中包含一个 id 列和一个时间列 (t),以秒为单位。对于每个id,我想对行进行分组,以便每个组都包含该组开始时间后 5 秒内的所有条目。例如,如果表是:

+---+--+
|id |t |
+---+--+
|1  |0 |
|1  |1 |
|1  |3 |
|1  |8 |
|1  |14|
|1  |18|
|2  |0 |
|2  |20|
|2  |21|
|2  |50|
+---+--+

那么结果应该是:

+---+--+---------+-------------+-------+
|id |t |subgroup |window_start |offset |
+---+--+---------+-------------+-------+
|1  |0 |1        |0            |0      |
|1  |1 |1        |0            |1      |
|1  |3 |1        |0            |3      |
|1  |8 |2        |8            |0      |
|1  |14|3        |14           |0      |
|1  |18|3        |14           |4      |
|2  |0 |1        |0            |0      |
|2  |20|2        |20           |0      |
|2  |21|2        |20           |1      |
|2  |50|3        |50           |0      |
+---+--+---------+-------------+-------+

我不需要子组编号是连续的。只要高效,我就可以在 Scala 中使用自定义 UDAF 的解决方案。

计算每个组内的(cumsum(t)-(cumsum(t)%5))/5 可用于识别第一个窗口,但不能用于识别其他窗口。本质上的问题是,在找到第一个窗口后,累积和需要重置为 0。我可以使用这种累积和方法进行递归操作,但这在大型数据集上效率太低。

以下方法有效并且比递归调用 cumsum 更有效,但它仍然太慢以至于在大型数据帧上毫无用处。

d = [[int(x[0]),float(x[1])] for x in [[1,0],[1,1],[1,4],[1,7],[1,14],[1,18],[2,5],[2,20],[2,21],[3,0],[3,1],[3,1.5],[3,2],[3,3.5],[3,4],[3,6],[3,6.5],[3,7],[3,11],[3,14],[3,18],[3,20],[3,24],[4,0],[4,1],[4,2],[4,6],[4,7]]]

schema = pyspark.sql.types.StructType(
  [
    pyspark.sql.types.StructField('id',pyspark.sql.types.LongType(),False),
    pyspark.sql.types.StructField('t',pyspark.sql.types.DoubleType(),False)
  ]
)
df = spark.createDataFrame(
  [pyspark.sql.Row(*x) for x in d],
  schema
)

def getSubgroup(ts):
  result = []
  total = 0
  ts = sorted(ts)
  tdiffs = numpy.array(ts)
  tdiffs = tdiffs[1:]-tdiffs[:-1]
  tdiffs = numpy.concatenate([[0],tdiffs])
  subgroup = 0
  for k in range(len(tdiffs)):
    t = ts[k]
    tdiff = tdiffs[k]
    total = total+tdiff
    if total >= 5:
      total = 0
      subgroup += 1
    result.append([t,float(subgroup)])
  return result

getSubgroupUDF = pyspark.sql.functions.udf(getSubgroup,pyspark.sql.types.ArrayType(pyspark.sql.types.ArrayType(pyspark.sql.types.DoubleType())))

subgroups = df.select('id','t').distinct().groupBy(
  'id'
).agg(
  pyspark.sql.functions.collect_list('t').alias('ts')
).withColumn(
  't_and_subgroup',
  pyspark.sql.functions.explode(getSubgroupUDF('ts'))
).withColumn(
  't',
  pyspark.sql.functions.col('t_and_subgroup').getItem(0)
).withColumn(
  'subgroup',
  pyspark.sql.functions.col('t_and_subgroup').getItem(1).cast(pyspark.sql.types.IntegerType())
).drop(
  't_and_subgroup','ts'
)

df = df.join(
  subgroups,
  on=['id','t'],
  how='inner'
)

df.orderBy(
  pyspark.sql.functions.asc('id'),pyspark.sql.functions.asc('t')
).show()

【问题讨论】:

【参考方案1】:

subgroup 列相当于按id, window_start 进行分区,所以也许您不需要创建它。

要创建 window_start ,我认为这样做可以: .withColumn("window_start", min("t").over(Window.partitionBy("id").orderBy(asc("t")).rangeBetween(0, 5)))

我不能 100% 确定 rangeBetween 的行为。

要创建offset,只需.withColumn("offset", col("t") - col("window_start"))

告诉我进展如何

【讨论】:

谢谢,但问题是提议的window_start 将拆分子组。使用时间 [0,1,2,6,7]rangeBetween(-5,0) 然后 6 和 7 位于不同的子组(分别为 1 和 2)。

以上是关于在 pyspark 数据框中查找不重叠的窗口的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark 跟踪和查找数据框中的最新值

在数据框中查找重叠范围并为其分配值

如何在 PySpark 中使用窗口函数?

在 PySpark 中查找两个数据帧之间的变化

在 pyspark 数据框中检索最大值时遇到问题

在 Pyspark 中查找给定时间窗口中的行数