Kafka Stream Suppress session-windowed-aggregation

Posted

技术标签:

【中文标题】Kafka Stream Suppress session-windowed-aggregation【英文标题】: 【发布时间】:2019-06-10 20:47:17 【问题描述】:

我在 Kafka 流应用程序中编写了这段代码:

KGroupedStream<String, foo> groupedStream = stream.groupByKey();
groupedStream.windowedBy(
SessionWindows.with(Duration.ofSeconds(3)).grace(Duration.ofSeconds(3)))
    .aggregate(() -> ...)
    .suppress(Suppressed.untilWindowCloses(unbounded()))
    .toStream()...

应该(如果我理解正确的话)在窗口关闭后为每个键发出记录。 不知何故,行为如下:

流不会发出第一条记录,即使使用不同的 Key,也只会在第二条记录之后转发它,然后第二条记录仅在第三条之后发出,依此类推..

我已经尝试了多个带有“exactly_once”的 StreamConfig,并且无论有没有缓存,这种行为仍然存在。

提前感谢您的帮助!

【问题讨论】:

如果你希望你的数据按时间段而不是“会话”聚合,我想你需要使用TimeWindows而不是SessionWindows 这对我不起作用。有一个定时窗口,但在为同一个键添加新事件之前,它仍然无法完成对旧窗口的抑制效果。非常令人沮丧和违反直觉! 【参考方案1】:

这是预期的行为。请注意,suppress() 基于事件时间。因此,只要没有新数据到达,时间就无法前进,因此提前驱逐记录是错误的,因为无法保证下一条记录可能属于当前窗口。

【讨论】:

感谢您的快速回答。在文档中,声明“在定义窗口计算后,您可以抑制中间结果,在窗口关闭时为每个用户发出最终计数”所以我认为我是否有会话或时间窗口,我会发出仅在窗口关闭后的结果,并且每个记录键我可以同时拥有多个窗口。如果不是这样,实现这种行为的想法是什么? 我想我现在理解这种行为了。非常感谢 !那么是否有可能使用与 KStream 的连接来触发具有某些键的某些记录的“驱逐”? 您在第一条评论中所说的都是真的。关键是,只有在事件时间进展时才能关闭窗口,因此这只发生在输入中出现具有更大时间戳的新记录之后。 -- 我不明白你的第二条评论。 对不起,我的意思是问有没有办法在不接收新记录的情况下发出数据?虽然根据你的第一个答案,这是不可能的 @ValBonn 我构建了一个解决方法,它是一种虚拟流,它有一个 Punctuator,它会根据挂钟时间执行周期性操作(以便推送流时间 xD)【参考方案2】:

我认为带有“suppress()”的“会话窗口”不会产生任何输出。

如有错误请指正。据我所知,suppress() 仅适用于基于时间的 Windows,不适用于基于会话的 Windows。

【讨论】:

suppress 适用于 SessionWindows - 您需要确保使用 + 定义 grace 期间 - 否则将应用默认值 (24h - gapMs),因此窗口将'not close' 在此宽限期内。另见 org.apache.kafka.streams.kstream.SessionWindows#gracePeriodMs ||例如:.windowedBy(SessionWindows.with(Duration.ofSeconds(10)).grace(Duration.ofSeconds(10)))

以上是关于Kafka Stream Suppress session-windowed-aggregation的主要内容,如果未能解决你的问题,请参考以下文章

简介Kafka Stream

Stream From 整合 Kafka

SpringBoot: kafka stream报kafka stream must subscribe to at least one source topic or global table

解开Kafka神秘的面纱:kafka stream及interceptor

Akka Stream Kafka vs Kafka Streams

Kafka Stream