Kafka Streams API:KStream 到 KTable
Posted
技术标签:
【中文标题】Kafka Streams API:KStream 到 KTable【英文标题】:Kafka Streams API: KStream to KTable 【发布时间】:2017-08-13 16:56:39 【问题描述】:我有一个 Kafka 主题,我在其中发送位置事件(key=user_id,value=user_location)。我可以将其作为KStream
阅读和处理:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Location> locations = builder
.stream("location_topic")
.map((k, v) ->
// some processing here, omitted form clarity
Location location = new Location(lat, lon);
return new KeyValue<>(k, location);
);
这很好用,但我希望有一个KTable
,其中包含每个用户的最后一个已知位置。我该怎么办?
我能够读写一个中间主题:
// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");
// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");
有没有从KStream
获取KTable
的简单方法?这是我第一个使用 Kafka Streams 的应用程序,所以我可能遗漏了一些明显的东西。
【问题讨论】:
【参考方案1】:更新:
在 Kafka 2.5 中,将添加一个新方法 KStream#toTable()
,这将提供一种将 KStream
转换为 KTable
的便捷方法。详情见:https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL
原答案:
目前没有直接的方法可以做到这一点。如 Confluent 常见问题解答中所述,您的方法绝对有效:http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step
这是最简单的代码方法。但是,它的缺点是(a)您需要管理一个额外的主题,并且(b)它会导致额外的网络流量,因为数据是从 Kafka 写入和重新读取的。
还有一种选择,使用“dummy-reduce”:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> stream = ...; // some computation that creates the derived KStream
KTable<String, Long> table = stream.groupByKey().reduce(
new Reducer<Long>()
@Override
public Long apply(Long aggValue, Long newValue)
return newValue;
,
"dummy-aggregation-store");
与选项 1 相比,这种方法在代码方面稍微复杂一些,但具有以下优点:(a) 不需要手动主题管理,(b) 不需要从 Kafka 重新读取数据。
总的来说,你需要自己决定,你更喜欢哪种方法:
在选项 2 中,Kafka Streams 将创建一个内部更改日志主题来备份 KTable 以实现容错。因此,这两种方法都需要 Kafka 中的一些额外存储,并导致额外的网络流量。总体而言,这是选项 2 中稍微复杂的代码与选项 1 中手动主题管理之间的权衡。
【讨论】:
我正在尝试使用您的方法通过愚蠢的groupByKey
从KStream
构造KTable
,但无法解析groupByKey
方法。你知道可能出了什么问题吗? (我是 Java 生态系统和 kafkas 的新手)
您的 Streams 版本是多少?对于旧版本,它应该是stream.reduceByKey(...)
而不是stream.groupByKey().reduce(...)
。见docs.confluent.io/3.1.0/streams/…
我以为我使用的是最新版本,但我在查看0.10.1
版本的文档时使用的是0.10.0
。所以我修复了它:) thnx
使用你的“dummy-reduce”你将如何在结果ktable中删除一个条目?我的理解是 reduce 将简单地忽略任何空值。更新:我看到您对另一个线程的评论表明使用了“代理”,这确实是我过去所做的。 ***.com/questions/50708252/…
尽管如此,以上哪个选项可能更倾向于最佳实践?以上是关于Kafka Streams API:KStream 到 KTable的主要内容,如果未能解决你的问题,请参考以下文章
将Kafka Streams代码迁移到Spring Cloud Stream吗?