为啥加入使用 20 秒水印后发送的行?

Posted

技术标签:

【中文标题】为啥加入使用 20 秒水印后发送的行?【英文标题】:Why does join use rows that were sent after watermark of 20 seconds?为什么加入使用 20 秒水印后发送的行? 【发布时间】:2018-12-06 09:37:22 【问题描述】:

我正在使用水印加入两个流,如下所示:

val order_wm = order_details.withWatermark("tstamp_trans", "20 seconds")
val invoice_wm = invoice_details.withWatermark("tstamp_trans", "20 seconds")
val join_df = order_wm
  .join(invoice_wm, order_wm.col("s_order_id") === invoice_wm.col("order_id"))

我对上述代码的理解,它将使每个流保持 20 秒。之后,但是,当我现在提供一个流,20 秒后提供另一个流时,两者也都加入了。似乎即使在水印完成后 Spark 仍将数据保存在内存中。我什至在 45 秒后尝试过,结果也加入了。

这让我对水印感到困惑。

【问题讨论】:

【参考方案1】:

它来了,但是,当我现在给一个流,20秒后另一个,那么两者都加入了。

这是可能的,因为测量的时间不是事件到达的时间,而是水印字段内的时间,即tstamp_trans。您必须确保tstamp_trans 中的最后一次时间是在将参与连接的行之后 20 秒。

【讨论】:

我使用 .withcolumn 将 tstamp_trans 添加为一列,在其中提供当前时间戳 您能否介绍一下您是如何测试应用程序的以及您得到了什么输出? 我从 kafka 获得两个流。我打开了两个 kafka 生产者,然后我发送了这两个流。我在一个 kafka 主题中发送数据并等待水印时间完成,然后推送另一个【参考方案2】:

引用文档来自:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#inner-joins-with-optional-watermarking

换句话说,您必须在连接中执行以下附加步骤。

在两个输入上定义水印延迟,以便引擎知道输入可以延迟多长时间(类似于流式聚合)

定义两个输入的事件时间约束,以便引擎可以确定何时不需要一个输入的旧行(即不满足时间约束)与另一个输入匹配。可以通过以下两种方式之一来定义此约束。

时间范围加入条件(例如...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR),

在事件时间窗口上加入(例如 ...JOIN ON leftTimeWindow = rightTimeWindow)。

【讨论】:

以上是关于为啥加入使用 20 秒水印后发送的行?的主要内容,如果未能解决你的问题,请参考以下文章

发送后 10 秒使用 JDA 删除消息

为啥即使在客户离开房间并加入另一个房间后,消息也会发送到所有房间?烧瓶插座

为啥ps给图片加水印后像素会变大

为啥 mkfifo'ed 管道仅在 ~25 秒后更新?

为啥我的信号量程序打印 20 秒而不是 10 秒?

我的子查询将执行时间增加了 20 秒。我怎样才能加快速度?