具有组间聚合结果的 Pyspark 窗口

Posted

技术标签:

【中文标题】具有组间聚合结果的 Pyspark 窗口【英文标题】:Pyspark window with aggregating results between groups 【发布时间】:2020-10-21 11:07:12 【问题描述】:

假设我有一个 DataFrame,其中包含来自不同用户通过不同协议和记录的指标值的请求:

+---+-----+--------+------------+
| ts| user|protocol|metric_value|
+---+-----+--------+------------+
|  0|user1|     tcp|         197|
|  1|user1|     udp|         155|
|  2|user1|     tcp|         347|
|  3|user1|     tcp|         117|
|  4|user1|     tcp|         230|
|  5|user1|     udp|         225|
|  6|user1|     udp|         297|
|  7|user1|     tcp|         790|
|  8|user1|     udp|         216|
|  9|user1|     udp|         200|
+---+-----+--------+------------+

我需要添加另一列,其中将为当前用户的每个协议添加最后记录的平均 metric_value(在当前时间戳之前且不早于 current_ts - 4)。 所以,算法是这样的:

对于每一行 X: 查找 row.user == X.user 和 row.ts 从这些行中提取每个协议的最新 metric_value(如果相应的记录早于 X.ts - 4,则将其丢弃) 计算这些 metric_values 的平均值 将计算的平均值附加到新列中的行

想要的结果应该是这样的:

+---+-----+--------+------------+-------+
| ts| user|protocol|metric_value|avg_val|
+---+-----+--------+------------+-------+
|  0|user1|     tcp|         197|   null| // no data for user1
|  1|user1|     udp|         155|    197| // only tcp value available
|  2|user1|     tcp|         347|    176| // (197 + 155) / 2
|  3|user1|     tcp|         117|    251| // (347 + 155) / 2
|  4|user1|     tcp|         230|    136| // (117 + 155) / 2
|  5|user2|     udp|         225|   null| // because no data for user2
|  6|user1|     udp|         297|    230| // because record with ts==1 is too old now
|  7|user1|     tcp|         790|  263.5| // (297 + 230) / 2
|  8|user1|     udp|         216|  543.5| // (297 + 790) / 2
|  9|user1|     udp|         200|    503| // (216 + 790) / 2
+---+-----+--------+------------+-------+

请注意,表中可能有任意数量的协议和用户。

如何实现?

我尝试过使用窗口函数、lag(1) 和按协议分区,但聚合函数只计算单个分区的平均值,而不是不同的分区结果。 最接近的结果是使用 row_number 按协议分区的 sql 请求,但我无法在那里传播 row.ts

【问题讨论】:

【参考方案1】:

这是基于 Scala 的解决方案,您可以将逻辑转换为 Python/Pyspark

样本数据:

val df = Seq((0,"user1","tcp",197),(1,"user1","udp",155),(2,"user1","tcp",347),(3,"user1","tcp",117),(4,"user1","tcp",230),(5,"user2","udp",225),(6,"user1","udp",297),(7,"user1","tcp",790),(8,"user1","udp",216),(9,"user1","udp",200))
.toDF("ts","user","protocol","metric_value")

对于每一行,获取列表中current_row.ts -4 的所有行(protocol,metric_value)

val winspec = Window.partitionBy("user").orderBy("ts").rangeBetween(Window.currentRow - 4, Window.currentRow-1)
val df2 = df.withColumn("recent_list", collect_list(struct($"protocol", $"metric_value")).over(winspec))

df2.orderBy("ts").show(false)
/*

+---+-----+--------+------------+------------------------------------------------+
|ts |user |protocol|metric_value|recent_list                                       |
+---+-----+--------+------------+------------------------------------------------+
|0  |user1|tcp     |197         |[]                                              |
|1  |user1|udp     |155         |[[tcp, 197]]                                    |
|2  |user1|tcp     |347         |[[tcp, 197], [udp, 155]]                        |
|3  |user1|tcp     |117         |[[tcp, 197], [udp, 155], [tcp, 347]]            |
|4  |user1|tcp     |230         |[[tcp, 197], [udp, 155], [tcp, 347], [tcp, 117]]|
|5  |user2|udp     |225         |[]                                              |
|6  |user1|udp     |297         |[[tcp, 347], [tcp, 117], [tcp, 230]]            |
|7  |user1|tcp     |790         |[[tcp, 117], [tcp, 230], [udp, 297]]            |
|8  |user1|udp     |216         |[[tcp, 230], [udp, 297], [tcp, 790]]            |
|9  |user1|udp     |200         |[[udp, 297], [tcp, 790], [udp, 216]]            |
+---+-----+--------+------------+------------------------------------------------+

现在您在一行中获得了所有必需的信息。您可以编写一个 UDF 来应用您获取最新协议类型和平均值的逻辑。


def getAverageValueForUniqRecents(list : Array[StructType]): Double = 
  // you logic goes here. 
  // Loop through your array in REVERSE ORDER
  // maintain a set to check if protocol already visited then skip, otherwise SUM
  //Finally average

【讨论】:

以上是关于具有组间聚合结果的 Pyspark 窗口的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark - 如何拆分具有 Datetime 类型的结构值的列?

如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?

在 PySpark 中随时间窗口聚合

如何根据 PySpark 中窗口聚合的条件计算不同值?

pyspark 时间序列数据的高性能滚动/窗口聚合

在 pyspark 中聚合 5 分钟窗口