具有组间聚合结果的 Pyspark 窗口
Posted
技术标签:
【中文标题】具有组间聚合结果的 Pyspark 窗口【英文标题】:Pyspark window with aggregating results between groups 【发布时间】:2020-10-21 11:07:12 【问题描述】:假设我有一个 DataFrame,其中包含来自不同用户通过不同协议和记录的指标值的请求:
+---+-----+--------+------------+
| ts| user|protocol|metric_value|
+---+-----+--------+------------+
| 0|user1| tcp| 197|
| 1|user1| udp| 155|
| 2|user1| tcp| 347|
| 3|user1| tcp| 117|
| 4|user1| tcp| 230|
| 5|user1| udp| 225|
| 6|user1| udp| 297|
| 7|user1| tcp| 790|
| 8|user1| udp| 216|
| 9|user1| udp| 200|
+---+-----+--------+------------+
我需要添加另一列,其中将为当前用户的每个协议添加最后记录的平均 metric_value(在当前时间戳之前且不早于 current_ts - 4)。 所以,算法是这样的:
对于每一行 X: 查找 row.user == X.user 和 row.ts 从这些行中提取每个协议的最新 metric_value(如果相应的记录早于 X.ts - 4,则将其丢弃) 计算这些 metric_values 的平均值 将计算的平均值附加到新列中的行想要的结果应该是这样的:
+---+-----+--------+------------+-------+
| ts| user|protocol|metric_value|avg_val|
+---+-----+--------+------------+-------+
| 0|user1| tcp| 197| null| // no data for user1
| 1|user1| udp| 155| 197| // only tcp value available
| 2|user1| tcp| 347| 176| // (197 + 155) / 2
| 3|user1| tcp| 117| 251| // (347 + 155) / 2
| 4|user1| tcp| 230| 136| // (117 + 155) / 2
| 5|user2| udp| 225| null| // because no data for user2
| 6|user1| udp| 297| 230| // because record with ts==1 is too old now
| 7|user1| tcp| 790| 263.5| // (297 + 230) / 2
| 8|user1| udp| 216| 543.5| // (297 + 790) / 2
| 9|user1| udp| 200| 503| // (216 + 790) / 2
+---+-----+--------+------------+-------+
请注意,表中可能有任意数量的协议和用户。
如何实现?
我尝试过使用窗口函数、lag(1) 和按协议分区,但聚合函数只计算单个分区的平均值,而不是不同的分区结果。 最接近的结果是使用 row_number 按协议分区的 sql 请求,但我无法在那里传播 row.ts
【问题讨论】:
【参考方案1】:这是基于 Scala 的解决方案,您可以将逻辑转换为 Python/Pyspark
样本数据:
val df = Seq((0,"user1","tcp",197),(1,"user1","udp",155),(2,"user1","tcp",347),(3,"user1","tcp",117),(4,"user1","tcp",230),(5,"user2","udp",225),(6,"user1","udp",297),(7,"user1","tcp",790),(8,"user1","udp",216),(9,"user1","udp",200))
.toDF("ts","user","protocol","metric_value")
对于每一行,获取列表中current_row.ts -4
的所有行(protocol,metric_value)
。
val winspec = Window.partitionBy("user").orderBy("ts").rangeBetween(Window.currentRow - 4, Window.currentRow-1)
val df2 = df.withColumn("recent_list", collect_list(struct($"protocol", $"metric_value")).over(winspec))
df2.orderBy("ts").show(false)
/*
+---+-----+--------+------------+------------------------------------------------+
|ts |user |protocol|metric_value|recent_list |
+---+-----+--------+------------+------------------------------------------------+
|0 |user1|tcp |197 |[] |
|1 |user1|udp |155 |[[tcp, 197]] |
|2 |user1|tcp |347 |[[tcp, 197], [udp, 155]] |
|3 |user1|tcp |117 |[[tcp, 197], [udp, 155], [tcp, 347]] |
|4 |user1|tcp |230 |[[tcp, 197], [udp, 155], [tcp, 347], [tcp, 117]]|
|5 |user2|udp |225 |[] |
|6 |user1|udp |297 |[[tcp, 347], [tcp, 117], [tcp, 230]] |
|7 |user1|tcp |790 |[[tcp, 117], [tcp, 230], [udp, 297]] |
|8 |user1|udp |216 |[[tcp, 230], [udp, 297], [tcp, 790]] |
|9 |user1|udp |200 |[[udp, 297], [tcp, 790], [udp, 216]] |
+---+-----+--------+------------+------------------------------------------------+
现在您在一行中获得了所有必需的信息。您可以编写一个 UDF 来应用您获取最新协议类型和平均值的逻辑。
def getAverageValueForUniqRecents(list : Array[StructType]): Double =
// you logic goes here.
// Loop through your array in REVERSE ORDER
// maintain a set to check if protocol already visited then skip, otherwise SUM
//Finally average
【讨论】:
以上是关于具有组间聚合结果的 Pyspark 窗口的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark - 如何拆分具有 Datetime 类型的结构值的列?