Kafka Streams API:避免在 KTable.mapValues 中添加额外的 stateStore
Posted
技术标签:
【中文标题】Kafka Streams API:避免在 KTable.mapValues 中添加额外的 stateStore【英文标题】:Kafka Streams API: Avoid additional stateStore in KTable.mapValues 【发布时间】:2021-12-02 16:48:22 【问题描述】:目前我们在 Kafka Streams 应用程序中使用以下内容:
streamsBuilder.table(inputTopic)
.join(...)
.mapValues(valueMapper) // <-- this causes another state store
.groupBy(...)
.aggregate(...)
.mapValues(...)
[...]
.toStream()
.to(outputTopic)
我刚刚意识到,连接后的 mapValues 创建了一个额外的状态存储。
如果 valueMapper 中的计算在某种程度上是微不足道的(例如删除对象中的字段等),那么避免额外的 statestore 不是更好吗? 我是否需要转换为 KStream 并使用 KStream.mapValues 来避免 stateStore,即
streamsBuilder.table(inputTopic)
.join(...)
.toStream
.mapValues(valueMapper) // <-- no more additional statestore
.groupBy(...)
.aggregate(...)
.mapValues(...)
[...]
.toStream()
.to(outputTopic)
或者有没有更好的替代方法在加入后应用附加映射?
【问题讨论】:
【参考方案1】:为什么要在连接步骤之后使用 mapValues?如果可以在联接的 ValueJoiner 中处理该逻辑。
streamsBuilder.table(inputTopic)
.join( anotherTable, (a ,b) -> c ) <--- Here you can perform any mapping process
.toStream
.groupBy(...)
.aggregate(...)
.mapValues(...)
[...]
.toStream()
.to(outputTopic)
【讨论】:
感谢您的回答以及在 ValueJoiner 中执行映射的建议。这可能是正确和最轻量级的方法。恕我直言,在 Kafka DSL 连接函数中执行此操作是违反直觉的。我是否正确理解 KTable 上的每个方法调用都会创建一个新的状态存储来保存结果? 是的,你是对的,根据文档,每个返回 KTable 的 KTable 函数都在创建一个新的 KTable,这是因为 Kafka Streams API 需要知道给定 Key 的最新状态是什么。 KTable 过滤表 = streamsBuilder.table().filter( ....);过滤表.toStream() 过滤表.groupBy()。 // 卡卡需要知道过滤后的表是什么状态。甚至检查每个方法的描述总是说 crete a new KTable kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/…以上是关于Kafka Streams API:避免在 KTable.mapValues 中添加额外的 stateStore的主要内容,如果未能解决你的问题,请参考以下文章
Kafka 消费者 API 上的 Kafka Streams DSL
[译] 流式处理:使用 Apache Kafka 的 Streams API 实现 Rabobank 的实时财务告警
用于事件过滤的 Kafka Consumer API 与 Streams API