与 deviceid 对应的 Spark 结构化流式水印

Posted

技术标签:

【中文标题】与 deviceid 对应的 Spark 结构化流式水印【英文标题】:Spark Structured streaming watermark corresponding to deviceid 【发布时间】:2019-08-22 04:45:00 【问题描述】:

传入的数据是如下所示的流,由 3 列组成

[
 system -> deviceId,
 time -> eventTime
 value -> some metric
]
+-------+-------------------+-----+
|system |time               |value|
+-------+-------------------+-----+
|system1|2019-08-20 07:13:10|1.5  |
|system2|2019-08-20 07:11:10|1.9  |
|system3|2019-08-20 07:13:15|1.3  |
|system1|2019-08-20 07:13:20|1.8  |
|system2|2019-08-20 07:11:20|1.6  |
|system3|2019-08-20 07:13:25|1.4  |
|system1|2019-08-20 07:13:30|1.2  |
|system2|2019-08-20 07:11:30|1.1  |
|system3|2019-08-20 07:13:35|1.5  |
+-------+-------------------+-----+

每个设备以固定的时间间隔生成数据,比如 [10 秒],

我有 spark 结构化流媒体应用程序,它使用

计算最大值

窗口持续时间 = 30 秒

滑动持续时间 = 30 秒

      df.withWatermark("time", "30 seconds")
      .groupBy(
        window(col("time"), "30 seconds", "30 seconds"),
        col("system")
      )
      .agg(max("value"))

问题 由于每个设备都是独立的,因此时钟也是独立的。由于各种原因,设备可能会阻塞和延迟数据发送,例如:[网络问题、设备使用率高等]

现在作为它的单一作业处理数据,它将开始根据水印丢弃阻塞设备的数据,我们正在丢失数据。

是否有任何方法或解决方法可以将水印与 deviceId 绑定。因此,该作业会根据 [deviceId EventTime] 维护水印,并且不会因为其他设备而丢弃它。

【问题讨论】:

可能是一个接受答案的想法。 SO 的最大谜团在于,当答案表明某事不可能时,答案通常会被忽略。今天我们可以弥补。 请问您能接受答案吗? - 因为它是正确的 【参考方案1】:

来自https://towardsdatascience.com/watermarking-in-spark-structured-streaming-9e164f373e9,我自己也说不清楚:

从 Spark 2.1 开始,水印被引入到结构化流中 API。您可以通过简单地将 withWatermark-Operator 添加到 查询:

withWatermark(eventTime: String, delayThreshold: String):

Dataset[T] 需要两个参数,a)一个事件时间列(必须是 与聚合正在处理的相同)和b)指定的阈值 延迟数据应该处理多长时间(以事件时间为单位)。这 然后 Spark 将维护聚合的状态,直到达到最大值 eventTime — delayThreshold > T ,其中 max eventTime 是最新的 引擎看到的事件时间,T 是窗口的开始时间。 如果迟到的数据落在此阈值内,则更新查询 最终(下图中的右图)。否则它会得到 丢弃且不触发重新处理(下图中左图)。

如您所见,该概念不涉及添加元数据拆分,例如设备ID。

【讨论】:

以上是关于与 deviceid 对应的 Spark 结构化流式水印的主要内容,如果未能解决你的问题,请参考以下文章

Spark结构化流 - 使用模式从文件中读取时间戳

如何将 Spark 结构化流数据写入 REST API?

Azure 流分析查询以检测特定 deviceId 的丢失活动事件

spark结构化流作业如何处理流-静态DataFrame连接?

在 Spark 结构化流中将数据内部连接到左连接 DataFrame 时丢失条目

Spark 结构化流/Spark SQL 中的条件爆炸