在 Flink 中使用相同的滑动窗口加入 2 个以上的流

Posted

技术标签:

【中文标题】在 Flink 中使用相同的滑动窗口加入 2 个以上的流【英文标题】:Joining more than 2 streams using the same sliding window in Flink 【发布时间】:2019-12-31 14:10:17 【问题描述】:

我有 3 个流 A、B 和 C,我应该将它们加入一个流,我们称之为 ABC 并对其进行一些操作。

重要的是我使用大小为 X 的滑动窗口和滑动 Y,其中 Y

所有流都包含一些我用于连接的通用 ID,X、Y 是以秒为单位定义的时间参数。

我当前的实现是使用大小为 X 的翻转窗口将流 A 和 B 加入 AB,然后使用大小为 X 和滑动 Y 的滑动窗口将 AB 与 C 连接。

这可能会导致错误答案,例如:流 A 在时间 0 收到消息,流 B 在时间 Y+1 收到消息。在这种情况下,两条消息应该进入同一个滑动窗口,因为 Y+1

我能否在 Flink 中使用单个滑动窗口进行多流连接,类似于我在 Spark 中连接多个数据帧的方式?

【问题讨论】:

我不明白你的情况。假设 X=60 和 Y=20。流 A 在时间 0 有消息,流 B 在时间 61 有消息。61 不小于 20,并且不存在应该包含这两条消息的窗口。 X+1 小于 Y 表示 X 小于 Y,这是一种奇怪的滑动窗口方式。 @DavidAnderson 你说得对,我在文本中混合了 X 和 Y。假设 X = 15 和 Y = 5。如果 A 在时间 0 有消息,B 在时间 6 有消息,它们应该是同一个滑动窗口的一部分,只是间隔不同 应该可以,但您可能需要在翻滚加入窗口之后再次分配时间戳和水印。 但是如果不应用窗口函数,我一次不能加入超过 2 个流。即如果我不能做 A.join(B).where(..).equalTo(..).join(C).where(..).equalTo(..).window(SlidingEventWindow),我必须做: A.join(B).where(..).equalTo(..).window(..).apply(..).join(C)..where(..).equalTo(..)。窗口(..).apply(..);因此,如果我希望输出是 3 个流的滑动窗口,我应该先应用一个滑动窗口(X,Y)然后再应用一个翻转窗口(X)? @DavidAnderson 您能否提供一个示例,说明如何在同一个窗口中加入 3 个流? 【参考方案1】:

我认为在这种情况下可行的是使用两个滑动窗口连接——一个用于计算 AB,另一个用于将这些结果与 C 连接。您可能遇到的一个问题是由第一次加入——我不确定 Flink 会将哪些时间戳放入包装 AB 事件的 StreamRecords 中,但对于普通(非加入)窗口,Flink 将结果记录上的时间戳设置为窗口结束时间。在这种情况下,这可能不是您想要的。如果这是一个问题,您可以在第一个滑动窗口之后放置一个额外的时间戳分配器,以在第二个连接之前(使用 C)适当地设置时间戳。

【讨论】:

以上是关于在 Flink 中使用相同的滑动窗口加入 2 个以上的流的主要内容,如果未能解决你的问题,请参考以下文章

11.Flink四大基石Window窗口的分类Flink提供了很多各种场景用的WindowAssigner基于时间的滚动和滑动基于时间的滚动和滑动窗口基于数量的滚动和滑动

11.Flink四大基石Window窗口的分类Flink提供了很多各种场景用的WindowAssigner基于时间的滚动和滑动基于时间的滚动和滑动窗口基于数量的滚动和滑动

flink 滚动窗口滑动窗口会话窗口全局窗口

Flink 滑动窗口使用触发器会触发多个窗口的计算

Flink 滚动窗口滑动窗口详解

Flink 实现自定义滑动窗口