合并多个相同的 Kafka Streams 主题

Posted

技术标签:

【中文标题】合并多个相同的 Kafka Streams 主题【英文标题】:Merging multiple identical Kafka Streams topics 【发布时间】:2017-04-09 07:29:43 【问题描述】:

我有 2 个 Kafka 主题流式传输来自不同来源的完全相同的内容,因此我可以在其中一个来源出现故障时获得高可用性。 我正在尝试使用 Kafka Streams 0.10.1.0 将 2 个主题合并为 1 个输出主题,这样我就不会错过任何有关失败的消息,并且在所有源都启动时没有重复。

使用 KStream 的leftJoin 方法时,其中一个主题(次要主题)可以正常关闭,但是当主要主题关闭时,没有任何内容发送到输出主题。这似乎是因为,根据Kafka Streams developer guide,

KStream-KStream leftJoin 总是由来自主流的记录驱动

所以如果没有来自主流的记录,它不会使用来自辅助流的记录,即使它们存在。一旦主流重新联机,输出就会正常恢复。

我还尝试使用outerJoin(添加重复记录),然后转换为 KTable 和 groupByKey 以消除重复,

KStream mergedStream = stream1.outerJoin(stream2, 
    (streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1,
    JoinWindows.of(2000L))

mergedStream.groupByKey()
            .reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore))
            .toStream((key,value) -> value)
            .to(outputStream)

但我仍然偶尔会得到重复。我还经常使用commit.interval.ms=200 让 KTable 发送到输出流。

处理此合并以从多个相同的输入主题中获得一次性输出的最佳方法是什么?

【问题讨论】:

一般来说,我会推荐Processor API来解决这个问题。您也可以尝试切换到当前的trunk 版本(不确定这是否适合您)。连接已重新设计,这可能会解决您的问题:cwiki.apache.org/confluence/display/KAFKA/… 新的连接语义将包含在 Kafka 0.10.2 中,其目标发布日期为 2017 年 1 月 (cwiki.apache.org/confluence/display/KAFKA/…)。 @MatthiasJ.Sax 我切换到主干,看起来leftJoin 现在的行为类似于 KStream-KStream 连接的outerJoin,所以我想我会回到 10.1 语义.我现在尝试的是创建一个输出空值的假流,我将在 leftJoin 中将其用作主要的,以前是主要的,并在 leftJoin 中使用该合并与辅助。我希望这将导致在主流中始终具有值,即使我的主流关闭(因为我只会从第一个 leftJoin 获得 null)。 新的leftJoin 确实会像旧的outerJoin 一样从双方触发(我想这就是您所说的“看起来leftJoin 现在表现得像outerJoin”的意思?)——这个比旧的leftJoin 更接近 SQL 语义——但leftJoinouterJoin 仍然不同:如果右侧触发并且没有找到连接伙伴,它会丢弃记录并且不会发出任何结果。 我还想知道您的密钥是如何分布的,以及同一个密钥在单个主题中使用的频率。也许您可以只使用一个同时使用两个主题的 KTable 来帮助消除......但如前所述,我强烈建议使用处理器 API! 啊,好吧,我没有想到新的leftJoinouterJoin 之间的区别。我确实最终使用了处理器 API 和您对另一个问题 (***.com/a/40837977/6167108) 的回答,它运行良好。您可以在此处将其添加为答案,我会接受。谢谢! 【参考方案1】:

使用任何类型的连接都无法解决您的问题,因为您总是会丢失结果(内连接,以防某些流停止)或“重复”null(左连接或外连接)如果两个流都在线)。有关 Kafka Streams 中连接语义的详细信息,请参阅 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics。

因此,我建议使用处理器 API,您可以使用 KStream process()transform()transformValues() 与 DSL 混合搭配。详情请见How to filter keys and value with a Processor using Kafka Stream DSL。

您还可以将自定义存储添加到您的处理器 (How to add a custom StateStore to the Kafka Streams DSL processor?) 以使重复过滤容错。

【讨论】:

以上是关于合并多个相同的 Kafka Streams 主题的主要内容,如果未能解决你的问题,请参考以下文章

如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?

Kafka Streams - 根据 Streams 数据发送不同的主题

Kafka Streams窗口加入了保留

Kafka Streams“Consumed.with()”与KafkaAvroDeserializer

Kafka Streams 容错与并行偏移管理

如何忽略从同一主题读取和写入不同事件类型的 Kafka Streams 应用程序中的某些类型的消息