Spark 结构化流 - 带有水印替代解决方案的 dropDuplicates
Posted
技术标签:
【中文标题】Spark 结构化流 - 带有水印替代解决方案的 dropDuplicates【英文标题】:Spark Structured streaming - dropDuplicates with watermark alternate solution 【发布时间】:2018-12-04 22:50:45 【问题描述】:我正在尝试使用带有水印的 dropDuplicate 函数对流数据进行重复数据删除。我目前面临的问题是我必须为给定记录设置两个时间戳
-
一个是 eventtimestamp - 从源创建记录的时间戳。
另一个是传输时间戳 - 来自负责流式传输数据的中间进程的时间戳。
重复是在中间阶段引入的,因此对于给定的记录重复,事件时间戳相同但传输时间戳不同。
对于水印,我喜欢使用传输时间戳,因为我知道在传输中重复出现的间隔不会超过 3 分钟。但我不能在 dropDuplicate 中使用它,因为它不会捕获重复项,因为重复项具有不同的传输时间戳。
这是一个例子,
Event 1: "EventString":"example1", "Eventtimestamp": "2018-11-29T10:00:00.00", "TransferTimestamp": "2018-11-29T10:05:00.00"
Event 2 (duplicate): "EventString":"example1", "Eventtimestamp": "2018-11-29T10:00:00.00", "TransferTimestamp": "2018-11-29T10:08:00.00"
在这种情况下,副本是在原始事件 3 分钟后的传输过程中创建的
我的代码如下,
streamDataset.
.withWatermark("transferTimestamp", "4 minutes")
.dropDuplicates("eventstring","transferTimestamp");
上述代码不会删除重复项,因为 transferTimestamp 对于事件及其重复项是唯一的。但目前,这是 Spark 强制我在设置 watermark 时将 watermark 列包含在 dropDuplicates 函数中的唯一方法。
我真的很想看到一个像下面这样的 dropDuplicate 实现,这对于任何至少一次语义流来说都是一个有效的情况,我不必在 dropDuplicates 中使用水印字段,并且仍然尊重基于水印的状态驱逐。但目前情况并非如此
streamDataset.
.withWatermark("transferTimestamp", "4 minutes")
.dropDuplicates("eventstring");
我无法使用 eventtimestamp,因为它没有顺序,而且时间范围变化很大(延迟事件和垃圾事件)。
如果有人对这种情况下的重复数据删除有其他解决方案或想法,请告诉我。
【问题讨论】:
【参考方案1】:对于您的用例,您不能直接使用 dropDuplicates API。您必须使用诸如 flatmapgroupwithstate 之类的 spark API 来使用任意有状态操作
【讨论】:
以上是关于Spark 结构化流 - 带有水印替代解决方案的 dropDuplicates的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL DSL 中的窗口(固定、滑动等)和水印支持