与 deviceid 对应的 Spark 结构化流式水印
Posted
技术标签:
【中文标题】与 deviceid 对应的 Spark 结构化流式水印【英文标题】:Spark Structured streaming watermark corresponding to deviceid 【发布时间】:2019-08-22 04:45:00 【问题描述】:传入的数据是如下所示的流,由 3 列组成
[
system -> deviceId,
time -> eventTime
value -> some metric
]
+-------+-------------------+-----+
|system |time |value|
+-------+-------------------+-----+
|system1|2019-08-20 07:13:10|1.5 |
|system2|2019-08-20 07:11:10|1.9 |
|system3|2019-08-20 07:13:15|1.3 |
|system1|2019-08-20 07:13:20|1.8 |
|system2|2019-08-20 07:11:20|1.6 |
|system3|2019-08-20 07:13:25|1.4 |
|system1|2019-08-20 07:13:30|1.2 |
|system2|2019-08-20 07:11:30|1.1 |
|system3|2019-08-20 07:13:35|1.5 |
+-------+-------------------+-----+
每个设备以固定的时间间隔生成数据,比如 [10 秒],
我有 spark 结构化流媒体应用程序,它使用
计算最大值窗口持续时间 = 30 秒
滑动持续时间 = 30 秒
df.withWatermark("time", "30 seconds")
.groupBy(
window(col("time"), "30 seconds", "30 seconds"),
col("system")
)
.agg(max("value"))
问题 由于每个设备都是独立的,因此时钟也是独立的。由于各种原因,设备可能会阻塞和延迟数据发送,例如:[网络问题、设备使用率高等]
现在作为它的单一作业处理数据,它将开始根据水印丢弃阻塞设备的数据,我们正在丢失数据。
是否有任何方法或解决方法可以将水印与 deviceId 绑定。因此,该作业会根据 [deviceId EventTime] 维护水印,并且不会因为其他设备而丢弃它。
【问题讨论】:
可能是一个接受答案的想法。 SO 的最大谜团在于,当答案表明某事不可能时,答案通常会被忽略。今天我们可以弥补。 请问您能接受答案吗? - 因为它是正确的 【参考方案1】:来自https://towardsdatascience.com/watermarking-in-spark-structured-streaming-9e164f373e9,我自己也说不清楚:
从 Spark 2.1 开始,水印被引入到结构化流中 API。您可以通过简单地将 withWatermark-Operator 添加到 查询:
withWatermark(eventTime: String, delayThreshold: String):
Dataset[T] 需要两个参数,a)一个事件时间列(必须是 与聚合正在处理的相同)和b)指定的阈值 延迟数据应该处理多长时间(以事件时间为单位)。这 然后 Spark 将维护聚合的状态,直到达到最大值 eventTime — delayThreshold > T ,其中 max eventTime 是最新的 引擎看到的事件时间,T 是窗口的开始时间。 如果迟到的数据落在此阈值内,则更新查询 最终(下图中的右图)。否则它会得到 丢弃且不触发重新处理(下图中左图)。
如您所见,该概念不涉及添加元数据拆分,例如设备ID。
【讨论】:
以上是关于与 deviceid 对应的 Spark 结构化流式水印的主要内容,如果未能解决你的问题,请参考以下文章
Azure 流分析查询以检测特定 deviceId 的丢失活动事件
spark结构化流作业如何处理流-静态DataFrame连接?