可以将 Kafka Streams 配置为等待 KTable 加载吗?

Posted

技术标签:

【中文标题】可以将 Kafka Streams 配置为等待 KTable 加载吗?【英文标题】:Can Kafka Streams be configured to wait for KTable to load? 【发布时间】:2019-10-26 14:49:45 【问题描述】:

我正在使用物化 KTable 与我的 KStream 进行左连接(而流是左侧)。

但是,它似乎立即处理,无需等待当前版本的 KTable 加载..

我的 KTable 源主题中有很多值,当我启动应用程序时,很多连接都失败了(嗯,不是真的,因为它是左连接)。

我可以让它延迟启动,以便等待初始主题加载吗?

【问题讨论】:

【参考方案1】:

处理在 Kafka Streams 中是时间同步的。因此,表输入主题和流输入主题是根据记录时间戳顺序进行处理的。这在语义上是合理的,因为在流表连接中,您不想将流记录与旧版本或 KTable 的较新版本连接,而是根据流记录时间戳使用正确的版本。

如果您的数据没有正确时间戳,您可以尝试通过builder.table(..., Consumed.with(...)) 指定自定义时间戳提取器以返回确保正确行为的时间戳(即,可能小于第一个流记录的时间戳?)

https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-timestamp-extractor

注意,正确的时间戳同步需要 Kafka Streams 2.1。旧版本仅以尽力而为的方式同步时间,可能无法提供您想要的行为。有关详细信息,请参阅 KIP-353。

https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization

Kafka 3.0 附带了更多时间戳同步改进:https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization

【讨论】:

其实我很想加入最新版的KTable..有没有办法做到这一点? 我赞成你的回答,因为它确实解释了为什么 KTable 不起作用但接受了另一个答案,因为它是我要求的解决方案。非常感谢! 好吧。 GlobalKTable 的行为确实不同,但它也提供了不同的语义和不同的磁盘要求:它不是一个分片,而是一个广播/复制的表,增加了客户端的存储需求。因此,您应该只将 if 用于小型数据集——它也没有与 KStream 时间同步,因此流表连接与流全局表连接具有不同的语义。---只是想确保您是知道使用 GlobalKTable 意味着什么。它不是 KTable 的“直接”替代品,而是您更改了程序的语义。 Actually, I do want to join with the newest version of the KTable.. Is there a way to do this? -- 如果您使用的是 Kafka Streams 2.1 或更新版本,您可以为 KTable 使用自定义时间戳提取器,该提取器始终返回 0 作为时间戳。这样,您将获得不同步的行为,并立即应用 KTable 更新。 --- 请注意,非同步处理使您的应用程序本质上是不确定的,并且您不能应用时间旅行来重现以前的结果。 我试过你所说的关于返回 0 作为KTable 的时间戳。但是它仍然会发生。拓扑在 KTable 完全加载之前开始。我不需要时间同步 KTable .. 我需要一个紧凑的缓存,我可以从中加载值。目前,KTable 似乎是唯一给我提供这种解决方案的解决方案,尽管它并不理想。【参考方案2】:

您可以使用 GlobalKTable。它一直等到所有值同步。

【讨论】:

使用GlobalKTable 会改变程序的语义。这不是 1:1 的替换 KTable

以上是关于可以将 Kafka Streams 配置为等待 KTable 加载吗?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Streams窗口加入了保留

如何改善响应式 kafka(Scala 加 Akka Streams)的缓慢性能?

合并多个相同的 Kafka Streams 主题

kafka在 Kafka Streams 中启用 Exactly-Once

Kafka Streams:topic.compression.type不是已知的配置

我们可以使用任何其他数据库,如 MariaDB 或 MongoDB 来在 Kafka Streams 中存储状态而不是 Rocks DB,有啥方法可以配置它吗?