为啥加入使用 20 秒水印后发送的行?
Posted
技术标签:
【中文标题】为啥加入使用 20 秒水印后发送的行?【英文标题】:Why does join use rows that were sent after watermark of 20 seconds?为什么加入使用 20 秒水印后发送的行? 【发布时间】:2018-12-06 09:37:22 【问题描述】:我正在使用水印加入两个流,如下所示:
val order_wm = order_details.withWatermark("tstamp_trans", "20 seconds")
val invoice_wm = invoice_details.withWatermark("tstamp_trans", "20 seconds")
val join_df = order_wm
.join(invoice_wm, order_wm.col("s_order_id") === invoice_wm.col("order_id"))
我对上述代码的理解,它将使每个流保持 20 秒。之后,但是,当我现在提供一个流,20 秒后提供另一个流时,两者也都加入了。似乎即使在水印完成后 Spark 仍将数据保存在内存中。我什至在 45 秒后尝试过,结果也加入了。
这让我对水印感到困惑。
【问题讨论】:
【参考方案1】:它来了,但是,当我现在给一个流,20秒后另一个,那么两者都加入了。
这是可能的,因为测量的时间不是事件到达的时间,而是水印字段内的时间,即tstamp_trans
。您必须确保tstamp_trans
中的最后一次时间是在将参与连接的行之后 20 秒。
【讨论】:
我使用 .withcolumn 将 tstamp_trans 添加为一列,在其中提供当前时间戳 您能否介绍一下您是如何测试应用程序的以及您得到了什么输出? 我从 kafka 获得两个流。我打开了两个 kafka 生产者,然后我发送了这两个流。我在一个 kafka 主题中发送数据并等待水印时间完成,然后推送另一个【参考方案2】:引用文档来自:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#inner-joins-with-optional-watermarking
换句话说,您必须在连接中执行以下附加步骤。
在两个输入上定义水印延迟,以便引擎知道输入可以延迟多长时间(类似于流式聚合)
定义两个输入的事件时间约束,以便引擎可以确定何时不需要一个输入的旧行(即不满足时间约束)与另一个输入匹配。可以通过以下两种方式之一来定义此约束。
时间范围加入条件(例如...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR),
在事件时间窗口上加入(例如 ...JOIN ON leftTimeWindow = rightTimeWindow)。
【讨论】:
以上是关于为啥加入使用 20 秒水印后发送的行?的主要内容,如果未能解决你的问题,请参考以下文章