Spark 结构化流 - 带有水印替代解决方案的 dropDuplicates

Posted

技术标签:

【中文标题】Spark 结构化流 - 带有水印替代解决方案的 dropDuplicates【英文标题】:Spark Structured streaming - dropDuplicates with watermark alternate solution 【发布时间】:2018-12-04 22:50:45 【问题描述】:

我正在尝试使用带有水印的 dropDuplicate 函数对流数据进行重复数据删除。我目前面临的问题是我必须为给定记录设置两个时间戳

    一个是 eventtimestamp - 从源创建记录的时间戳。 另一个是传输时间戳 - 来自负责流式传输数据的中间进程的时间戳。

重复是在中间阶段引入的,因此对于给定的记录重复,事件时间戳相同但传输时间戳不同。

对于水印,我喜欢使用传输时间戳,因为我知道在传输中重复出现的间隔不会超过 3 分钟。但我不能在 dropDuplicate 中使用它,因为它不会捕获重复项,因为重复项具有不同的传输时间戳。

这是一个例子,

Event 1: "EventString":"example1", "Eventtimestamp": "2018-11-29T10:00:00.00", "TransferTimestamp": "2018-11-29T10:05:00.00" 
Event 2 (duplicate): "EventString":"example1", "Eventtimestamp": "2018-11-29T10:00:00.00", "TransferTimestamp": "2018-11-29T10:08:00.00"

在这种情况下,副本是在原始事件 3 分钟后的传输过程中创建的

我的代码如下,

streamDataset.
.withWatermark("transferTimestamp", "4 minutes")
.dropDuplicates("eventstring","transferTimestamp");

上述代码不会删除重复项,因为 transferTimestamp 对于事件及其重复项是唯一的。但目前,这是 Spark 强制我在设置 watermark 时将 watermark 列包含在 dropDuplicates 函数中的唯一方法。

我真的很想看到一个像下面这样的 dropDuplicate 实现,这对于任何至少一次语义流来说都是一个有效的情况,我不必在 dropDuplicates 中使用水印字段,并且仍然尊重基于水印的状态驱逐。但目前情况并非如此

streamDataset.
.withWatermark("transferTimestamp", "4 minutes")
.dropDuplicates("eventstring");

我无法使用 eventtimestamp,因为它没有顺序,而且时间范围变化很​​大(延迟事件和垃圾事件)。

如果有人对这种情况下的重复数据删除有其他解决方案或想法,请告诉我。

【问题讨论】:

【参考方案1】:

对于您的用例,您不能直接使用 dropDuplicates API。您必须使用诸如 flatmapgroupwithstate 之类的 spark API 来使用任意有状态操作

【讨论】:

以上是关于Spark 结构化流 - 带有水印替代解决方案的 dropDuplicates的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 结构化流中保存通过水印丢弃的记录

Spark SQL DSL 中的窗口(固定、滑动等)和水印支持

带有自定义接收器的 Spark 结构化流中的输入行数

与 deviceid 对应的 Spark 结构化流式水印

带有水印和音频的 FFmpeg 连接“过滤器图描述中的流说明符不匹配任何流”。

带有替代方法的重载方法值选择