Kafka Stream 和 KTable 一对多关系加入

Posted

技术标签:

【中文标题】Kafka Stream 和 KTable 一对多关系加入【英文标题】:Kafka Stream and KTable One-to-Many Relationship Join 【发布时间】:2017-05-31 03:50:09 【问题描述】:

我有一个 kafka 流——比如博客和一个 kafka 表——比如与这些博客相关的 cmets。来自 kafka 流的键可以映射到 Kafka 表中的多个值,即一个博客可以有多个 cmets。我想将这两者结合起来,并创建一个带有评论 id 数组的新对象。但是当我加入时,流只包含最后一个评论 id。是否有任何文档或示例代码可以为我指明正确的方向如何实现这一目标?基本上,是否有任何文档详细说明如何使用 Kafka 流和 Kafka 表进行一对多关系连接?

KStream<Integer, EnrichedBlog> joinedBlogComments = blogsStream.join(commentsTbl,
              (blogId, blog) -> blog.getBlogId(),
              (blog, comment) -> new EnrichedBlog(blog, comment));

所以我需要一组评论 ID,而不是评论。

【问题讨论】:

【参考方案1】:

我找不到与您的代码示例中的签名匹配的连接方法,但我认为问题出在以下:

KTables 被解释为一个 changlog,也就是说,每一条具有相同 key 的下一条消息都被解释为对记录的更新,而不是新记录。这就是为什么您只看到给定键(博客 ID)的最后一条“评论”消息,之前的值被覆盖。 为了克服这个问题,您首先需要更改填充 KTable 的方式。您可以做的是将您的评论主题作为 KStream 添加到您的拓扑中,然后执行一个聚合,该聚合只是构建一个数组或一个共享相同博客 ID 的 cmets 列表。该聚合返回一个 KTable,您可以将其加入您的博客 KStream。

这是一个草图,你可以如何构建一个列表值的 KTable:

builder.stream("yourCommentTopic") // where key is blog id
.groupByKey()
.aggregate(() -> new ArrayList(), 
    (key, value, agg) -> new KeyValue<>(key, agg.add(value)),
    yourListSerde);

列表比数组更容易在聚合中使用,因此如果需要,我建议您将其转换为下游数组。您还需要为您的列表提供一个 serde 实现,在上面的示例中为“yourListSerde”。

【讨论】:

为了便于阅读,我要补充一点,“yourCommentTopic”的键是相应的博文 ID。然后groupByKey 步骤确保后续的aggregate 步骤可以访问特定博客文章的所有 cmets(因此可以创建所有 cmets 的列表)。 谢谢!修改了这个效果的答案 我尝试使用它,发现agg.add(value) 不返回 ArrayList 而是返回一个布尔值,说明添加是否成功,这意味着这不起作用。相反,我需要使用-&gt;agg.add(value); return agg; - 除非我在这里误解了什么?【参考方案2】:

如果您使用带有模式注册表的 avro,您应该编写自己的聚合器,因为 kafka 流无法序列化 ArrayList。

    val kTable = aStream
        .groupByKey()
        .aggregate(
                
                    YourAggregator() // initialize aggregator
                ,
                 _, value, agg ->
                    agg.add(value) // add value to a list in YourAggregator
                    agg
                
        )

然后将 kTable 与您的其他信息流 (bStream) 一起加入。

    bStream
        .join(
                kTable,
                 b, a ->
                    // do your value join from a to b
                    b
                
        )

抱歉,我的 sn-ps 是用 Kotlin 编写的。

【讨论】:

【参考方案3】:

正如上面 Michal 的正确答案所指出的,在这种情况下,不能使用由 blogId 键入的 KTable 来跟踪博客,因为此类表中只保留了最新的博客值。

作为对他回答中提到的解决方案的建议优化,请注意,如果每个博客有很多 cmets,则在 .aggregate() 中保持不断增长的列表可能会在数据大小和时间方面变得昂贵。这是因为在后台,该聚合的每次迭代都会导致 List 的实例不断增长,这在 java 或 scala 中是可以的,因为数据重用,但每个都单独序列化到底层状态存储.示意性地,假设某个键有 10 个 cmets,那么这个表达式被调用 10 次:

(key, value, agg) -> new KeyValue<>(key, agg.add(value))

每次生成一个大小为 1、2、然后......然后 10 的列表,每个都独立序列化到底层状态存储,这意味着 1+2+3+...+10=55 值将被序列化(嗯,也许有一些优化 s.t. 其中一些序列化被跳过了,我不知道,虽然我认为空间和时间复杂度是一样的)。

另一种虽然更复杂的方法是在状态存储中使用range scans,这使得数据结构在像 DynamoDB 这样的键值存储中看起来有点像(partition_key, sort_key),在其中我们用一个键存储每个评论喜欢(blogId, commentId)。在这种情况下,您仍然可以通过blogId keyBy() cmets 流,然后 .transform(...) 将其传递给处理器 API,您可以在其中应用范围扫描想法,每次添加(即序列化)一个补充注释到状态存储而不是整个列表的新实例。

当我们描绘很多(blogId, commentId)键实例时,一对多关系变得非常明显,它们都有相同的blogId和不同的commentId,都存储在同一个状态存储实例中物理节点,而这整个事情在很多节点中同时发生在很多 blogId 上。

我在我的博客上发布了有关该模式的更多详细信息:One-to-many Kafka Streams Ktable join,我还发布了 full working example in github

【讨论】:

以上是关于Kafka Stream 和 KTable 一对多关系加入的主要内容,如果未能解决你的问题,请参考以下文章

流式计算新贵Kafka Stream设计详解

介绍一位分布式流处理新贵:Kafka Stream

Kafka Streams API:KStream 到 KTable

Kafka Ktable 还流式传输重复更新

可以将 Kafka Streams 配置为等待 KTable 加载吗?

KSQL KTabke+KTable Join重复结果异常。