kafka Streams会话窗口

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka Streams会话窗口相关的知识,希望对你有一定的参考价值。

您好我正在使用kafka会话窗口,非活动时间为5分钟。当达到非活动时间并且会话根据密钥下降时,我想要某种反馈。我假设我有

(A,1)

记录“A”是关键字。现在,如果我在5分钟内没有获得任何'A'密钥记录,则会话被删除。

我想在会话结束时做一些操作,让那个会话说(值)* 2。有什么方法可以使用Kafka Stream API实现这一目标

答案

在缺口时间过后,Kafka Streams不会放弃会话。相反,如果在间隔时间过后,如果具有相同密钥的另一个记录到达并且同时维护两个会话,则将创建新会话。这允许处理无序数据。甚至可能发生,如果无序数据落入间隙并且两个会话彼此“连接”,则会合并两个会话。

默认情况下,会话保持1天。您可以通过SessionWindows#until()方法更改此设置。如果会话过期,它将被静默删除。没有通知。您还需要考虑配置参数window.store.change.log.additional.retention.ms

默认保留设置为Windows#maintainMs()+ 1天。您可以通过在StreamsConfig中指定StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG来覆盖此设置。

因此,如果时间过去,你想要做出反应,你应该考虑允许你根据“偶数时间进度”或挂钟时间注册常规回调(某种计时器)的标点符号。如果会话在一段时间内没有更新并且您认为它已“完成”,则允许您做出反应。

以上是关于kafka Streams会话窗口的主要内容,如果未能解决你的问题,请参考以下文章

Kafka---窗口函数

如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?

Kafka Streams窗口加入了保留

Kafka Streams - 根据 Streams 数据发送不同的主题

将Kafka Streams代码迁移到Spring Cloud Stream吗?

Kafka Streams“Consumed.with()”与KafkaAvroDeserializer