Kafka Streams API:避免在 KTable.mapValues 中添加额外的 stateStore

Posted

技术标签:

【中文标题】Kafka Streams API:避免在 KTable.mapValues 中添加额外的 stateStore【英文标题】:Kafka Streams API: Avoid additional stateStore in KTable.mapValues 【发布时间】:2021-12-02 16:48:22 【问题描述】:

目前我们在 Kafka Streams 应用程序中使用以下内容:

streamsBuilder.table(inputTopic)
              .join(...)
              .mapValues(valueMapper) // <-- this causes another state store
              .groupBy(...)
              .aggregate(...)
              .mapValues(...)

[...]
              .toStream()
              .to(outputTopic)

我刚刚意识到,连接后的 mapValues 创建了一个额外的状态存储。

如果 valueMapper 中的计算在某种程度上是微不足道的(例如删除对象中的字段等),那么避免额外的 statestore 不是更好吗? 我是否需要转换为 KStream 并使用 KStream.mapValues 来避免 stateStore,即

streamsBuilder.table(inputTopic)
              .join(...)
              .toStream
              .mapValues(valueMapper) // <-- no more additional statestore
              .groupBy(...)
              .aggregate(...)
              .mapValues(...)

[...]
              .toStream()
              .to(outputTopic)

或者有没有更好的替代方法在加入后应用附加映射?

【问题讨论】:

【参考方案1】:

为什么要在连接步骤之后使用 mapValues?如果可以在联接的 ValueJoiner 中处理该逻辑。

streamsBuilder.table(inputTopic)
              .join( anotherTable, (a ,b) ->  c )  <--- Here you can perform any mapping process
              .toStream
              .groupBy(...)
              .aggregate(...)
              .mapValues(...)

[...]
              .toStream()
              .to(outputTopic)

【讨论】:

感谢您的回答以及在 ValueJoiner 中执行映射的建议。这可能是正确和最轻量级的方法。恕我直言,在 Kafka DSL 连接函数中执行此操作是违反直觉的。我是否正确理解 KTable 上的每个方法调用都会创建一个新的状态存储来保存结果? 是的,你是对的,根据文档,每个返回 KTable 的 KTable 函数都在创建一个新的 KTable,这是因为 Kafka Streams API 需要知道给定 Key 的最新状态是什么。 KTable 过滤表 = streamsBuilder.table().filter( ....);过滤表.toStream() 过滤表.groupBy()。 // 卡卡需要知道过滤后的表是什么状态。甚至检查每个方法的描述总是说 crete a new KTable kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/…

以上是关于Kafka Streams API:避免在 KTable.mapValues 中添加额外的 stateStore的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 消费者 API 上的 Kafka Streams DSL

使用 Kafka 的 Streams API 处理错误消息

[译] 流式处理:使用 Apache Kafka 的 Streams API 实现 Rabobank 的实时财务告警

用于事件过滤的 Kafka Consumer API 与 Streams API

Apache Kafka 1.0.0 Streams API Multiple Multilevel groupby

如何有效地链接来自 Kafka Streams 中平面 api 数据的 groupby 查询?