Kafka Streams API:KStream 到 KTable

Posted

技术标签:

【中文标题】Kafka Streams API:KStream 到 KTable【英文标题】:Kafka Streams API: KStream to KTable 【发布时间】:2017-08-13 16:56:39 【问题描述】:

我有一个 Kafka 主题,我在其中发送位置事件(key=user_id,value=user_location)。我可以将其作为KStream 阅读和处理:

KStreamBuilder builder = new KStreamBuilder();

KStream<String, Location> locations = builder
        .stream("location_topic")
        .map((k, v) -> 
            // some processing here, omitted form clarity
            Location location = new Location(lat, lon);
            return new KeyValue<>(k, location);
        );

这很好用,但我希望有一个KTable,其中包含每个用户的最后一个已知位置。我该怎么办?

我能够读写一个中间主题:

// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");

// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");

有没有从KStream 获取KTable 的简单方法?这是我第一个使用 Kafka Streams 的应用程序,所以我可能遗漏了一些明显的东西。

【问题讨论】:

【参考方案1】:

更新:

在 Kafka 2.5 中,将添加一个新方法 KStream#toTable(),这将提供一种将 KStream 转换为 KTable 的便捷方法。详情见:https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL

原答案:

目前没有直接的方法可以做到这一点。如 Confluent 常见问题解答中所述,您的方法绝对有效:http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

这是最简单的代码方法。但是,它的缺点是(a)您需要管理一个额外的主题,并且(b)它会导致额外的网络流量,因为数据是从 Kafka 写入和重新读取的。

还有一种选择,使用“dummy-reduce”:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> stream = ...; // some computation that creates the derived KStream

KTable<String, Long> table = stream.groupByKey().reduce(
    new Reducer<Long>() 
        @Override
        public Long apply(Long aggValue, Long newValue) 
            return newValue;
        
    ,
    "dummy-aggregation-store");

与选项 1 相比,这种方法在代码方面稍微复杂一些,但具有以下优点:(a) 不需要手动主题管理,(b) 不需要从 Kafka 重新读取数据。

总的来说,你需要自己决定,你更喜欢哪种方法:

在选项 2 中,Kafka Streams 将创建一个内部更改日志主题来备份 KTable 以实现容错。因此,这两种方法都需要 Kafka 中的一些额外存储,并导致额外的网络流量。总体而言,这是选项 2 中稍微复杂的代码与选项 1 中手动主题管理之间的权衡。

【讨论】:

我正在尝试使用您的方法通过愚蠢的groupByKeyKStream 构造KTable,但无法解析groupByKey 方法。你知道可能出了什么问题吗? (我是 Java 生态系统和 kafkas 的新手) 您的 Streams 版本是多少?对于旧版本,它应该是stream.reduceByKey(...) 而不是stream.groupByKey().reduce(...)。见docs.confluent.io/3.1.0/streams/… 我以为我使用的是最新版本,但我在查看0.10.1 版本的文档时使用的是0.10.0。所以我修复了它:) thnx 使用你的“dummy-reduce”你将如何在结果ktable中删除一个条目?我的理解是 reduce 将简单地忽略任何空值。更新:我看到您对另一个线程的评论表明使用了“代理”,这确实是我过去所做的。 ***.com/questions/50708252/… 尽管如此,以上哪个选项可能更倾向于最佳实践?

以上是关于Kafka Streams API:KStream 到 KTable的主要内容,如果未能解决你的问题,请参考以下文章

将Kafka Streams代码迁移到Spring Cloud Stream吗?

Kafka Streams - 根据 Streams 数据发送不同的主题

是否可以使用 Kafka Streams 访问消息头?

Kafka---窗口函数

合并多个相同的 Kafka Streams 主题

Kafka Streams 在 HDFS 上查找数据