kafka Streams会话窗口
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka Streams会话窗口相关的知识,希望对你有一定的参考价值。
您好我正在使用kafka会话窗口,非活动时间为5分钟。当达到非活动时间并且会话根据密钥下降时,我想要某种反馈。我假设我有
(A,1)
记录“A”是关键字。现在,如果我在5分钟内没有获得任何'A'密钥记录,则会话被删除。
我想在会话结束时做一些操作,让那个会话说(值)* 2。有什么方法可以使用Kafka Stream API实现这一目标
在缺口时间过后,Kafka Streams不会放弃会话。相反,如果在间隔时间过后,如果具有相同密钥的另一个记录到达并且同时维护两个会话,则将创建新会话。这允许处理无序数据。甚至可能发生,如果无序数据落入间隙并且两个会话彼此“连接”,则会合并两个会话。
默认情况下,会话保持1天。您可以通过SessionWindows#until()
方法更改此设置。如果会话过期,它将被静默删除。没有通知。您还需要考虑配置参数window.store.change.log.additional.retention.ms
:
默认保留设置为Windows#maintainMs()+ 1天。您可以通过在StreamsConfig中指定StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG来覆盖此设置。
因此,如果时间过去,你想要做出反应,你应该考虑允许你根据“偶数时间进度”或挂钟时间注册常规回调(某种计时器)的标点符号。如果会话在一段时间内没有更新并且您认为它已“完成”,则允许您做出反应。
以上是关于kafka Streams会话窗口的主要内容,如果未能解决你的问题,请参考以下文章
如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?
Kafka Streams - 根据 Streams 数据发送不同的主题