使用 PySpark 而不使用窗口对来自 Kafka 的流数据执行滚动平均

Posted

技术标签:

【中文标题】使用 PySpark 而不使用窗口对来自 Kafka 的流数据执行滚动平均【英文标题】:Performing rolling average on streaming data from Kafka using PySpark and without using window 【发布时间】:2020-09-20 19:30:35 【问题描述】:

我一直在尝试对流数据进行数据聚合,得到以下错误:

窗口方法存在这个问题 - '流数据帧/数据集不支持非基于时间的窗口'

我正在寻找一种替代窗口方法的方法来对流数据执行聚合。

    w = (Window
     .partitionBy("orig_time")
     .orderBy(F.col("epoch").cast('long'))
     .rangeBetween(-minutes(5), 0))
#windowedDeviceDF = deviceDF.withColumn('rolling_average', F.avg("tag_value").over(w))
windowSpec5 = Window.partitionBy("orig_time").orderBy(F.col("epoch").cast('long')).rangeBetween(-minutes(5),0)
windowSpec10 = Window.partitionBy("orig_time").orderBy(F.col("epoch").cast('long')).rangeBetween(-minutes(10), 0)

windowedDeviceDF = deviceDF.withColumn("avg5", F.avg("tag_value").over(windowSpec5)).withColumn("avg10",F.avg("tag_value").over(windowSpec10)).withColumn('occurrences_in_5_min', F.count('epoch').over(w)).withColumn('rolling_average', F.avg("tag_value").over(w)).select(
"tag_name", "epoch", "avg5", "avg10", "occurrences_in_5_min", "rolling_average")

windowedDeviceDF = deviceDF.groupBy(deviceDF.tag_name, deviceDF.tag_value, window(deviceDF.orig_time, windowDuration, slideDuration)).avg()

【问题讨论】:

【参考方案1】:

与滑动窗口不同,但它避免了以某种方式保留数据...

使用“指数移动平均线”:

avg += fact * (xn - avg)

在哪里

avg 是当前平均值;这是唯一需要从一行到下一行的变量。 (与最后 N 个值相反) fact 是控制平均平滑度的常数分数——0.01 对变化的响应非常慢; 0.5 响应非常快。 xn我们正在平均的值(在当前行中)。

【讨论】:

不使用窗口的方式可以实现吗?

以上是关于使用 PySpark 而不使用窗口对来自 Kafka 的流数据执行滚动平均的主要内容,如果未能解决你的问题,请参考以下文章

当窗口/分区使用前向填充时,向 pyspark sql 中的 last() 函数添加条件

Pyspark 窗口函数,具有对旅行者数量进行取整的条件

带有窗口函数的 PySpark 数据偏度

具有组间聚合结果的 Pyspark 窗口

如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?

遍历 pyspark 数据框中的列,而不为单个列创建不同的数据框