结构化流式多水印
Posted
技术标签:
【中文标题】结构化流式多水印【英文标题】:Structured streaming multiple watermarks 【发布时间】:2019-09-03 10:25:02 【问题描述】:如果重要的话,我会使用 Spark 2.3.0。
根据结构化流式处理文档,它处理late data using watermarks。它还提到流式重复数据删除也可以通过使用水印来限制enter link description here 中间状态的存储量来实现。
所以,我的问题是这些水印是否可以具有不同的值,或者水印是否只指定一次?我问这个是因为我将在聚合后对值进行重复数据删除,因此处理延迟数据的容差是不同的。
【问题讨论】:
【参考方案1】:来自Policy for handling multiple watermarks:
一个流式查询可以有多个联合或连接在一起的输入流。每个输入流都可以具有不同的延迟数据阈值,这些阈值对于有状态操作需要被容忍。您可以在每个输入流上使用 withWatermarks("eventTime", delay) 指定这些阈值。
在执行查询时,结构化流式处理会单独跟踪每个输入流中看到的最大事件时间,根据相应的延迟计算水印,并选择单个全局水印用于有状态操作。默认情况下,选择最小值作为全局水印,因为如果其中一个流落后于其他流(例如,其中一个流由于上游故障而停止接收数据),它可以确保不会意外丢弃任何数据。换句话说,全局水印将以最慢的流的速度安全移动,查询输出也会相应延迟。
从Spark 2.4开始,您可以通过将SQL配置spark.sql.streaming.multipleWatermarkPolicy设置为max(默认为min)来设置multiple watermark策略选择最大值作为全局watermark。
事实上,这也适用于任何对水印敏感的运算符。
【讨论】:
感谢 Jacek 的帮助。但是我想了解的是,当我们只有一个流,但是要应用几个对水印敏感的运算符时,例如删除重复项,在任意状态处理的情况下的事件时间超时,水印和延迟阈值仅指定一次意味着它不能在这些运营商之间变化?所以,如果我们删除超过 20 分钟的重复项,我们就不能有 30 分钟的偶数超时,对吧? 这很容易验证自己! :) 在流数据集上使用explain
,你就会知道答案,因为只有一个......好吧......不会透露它,让你告诉我们你自己发现了什么:)以上是关于结构化流式多水印的主要内容,如果未能解决你的问题,请参考以下文章
结构化流式传输:具有流式传输源的查询必须使用 writeStream.start() 执行
Spark 结构化流式 Elasticsearch 集成问题。数据源es不支持流式写入
为啥结构化流式处理因“java.lang.IncompatibleClassChangeError:Implementing class”而失败?