Kafka Streams 是不是适合触发记录的批处理?

Posted

技术标签:

【中文标题】Kafka Streams 是不是适合触发记录的批处理?【英文标题】:Are Kafka Streams Appropriate for Triggering Batch Processing of Records?Kafka Streams 是否适合触发记录的批处理? 【发布时间】:2022-01-22 06:10:01 【问题描述】:

上下文

我有三个服务,每个服务都会生成特定的 JSON 负载(需要不同的时间来完成),这是处理组合结果所需的消息将所有三个 JSON 有效负载合并为一个有效负载。这个最终的有效载荷将依次发送到另一个 Kafka 主题,以便它可以被另一个服务使用。

您可以在下面找到一个图表,该图表可以更好地解释手头的问题。信息聚合器服务接收到聚合信息的请求,它将该请求发送到 Kafka 主题,以便服务 1、服务 2 和服务 3 使用该请求并将其数据(JSON 有效负载)发送到 3 个不同的 Kafka 主题。

信息聚合器必须使用来自三个服务的消息(这些消息在非常不同的时间发送到各自的 Kafka 主题,例如服务 1 需要半小时响应,而服务 2 和 3 需要不到 10 分钟),以便它可以生成最终的有效载荷(表示为聚合信息)发送到另一个 Kafka 主题。

研究

在对 Kafka 和 Kafka Streams 进行了大量研究后,我遇到了this article,它提供了一些关于如何详细说明这一点的深刻见解。

在本文中,作者使用来自单个主题的消息,而在我的特定用例中,我必须从三个不同的主题中消费,等待来自每个主题的具有特定 ID 的每条消息到达,以便我可以向我的流程发出信号它可以继续消费不同主题中具有相同 ID 的 3 条消息以生成最终消息并将该最终消息发送到另一个 Kafka 主题(然后另一个服务将使用该消息)。

深思熟虑的解决方案

我的想法是我需要一个 Kafka Stream 检查所有三个主题,当它看到所有 3 条消息都可用时,向一个名为 e.g. 的 kafka 主题发送一条消息。信息聚合器将从其消费的 TopicEvents 以及通过消费消息将确切地知道从哪个主题、分区和偏移量获取哪些消息,然后可以继续将最终有效负载发送到另一个 Kafka 主题。

问题

我是否对 Kafka Streams 和 Batch 的使用非常错误 正在处理?

如何向 Stream 发出所有消息都已到达的信号,以便它可以生成要放置在 TopicEvent 中的消息,从而向 Information Aggregator 发出不同主题中的所有消息已到达并准备就绪的信号消费了吗?

很抱歉这篇长篇文章,您可以提供的任何指示都会非常有帮助,并在此先感谢您

【问题讨论】:

【参考方案1】:

如何向 Stream 发出所有消息都已到达的信号

您可以使用 Streams 和连接来完成此操作。由于连接仅限于 2 个主题,因此您需要进行 2 次连接才能获得所有 3 个主题都发生的事件。

加入 TopicA 和 TopicB 以获取 A 和 B 发生时的事件。将 AB 与 TopicC 连接起来,得到 A、B 和 C 发生的事件。

【讨论】:

通过这样做,我获得了所有三个有效负载,然后可以向主题发送消息以向我的聚合器发出信号,或者我应该继续处理消息并将它们发送到最终的 kafka 主题吗?根据文章中的建议,我应该向我的信息聚合器流程发出信号,但如果您认为我不妨只处理消息并继续,我将致力于这样做 如果最终主题中出现的消息表明所有事件都已发生,如果聚合器是该最终主题的消费者,则每当最终主题中出现消息时,即通知您聚合器来完成它的工作

以上是关于Kafka Streams 是不是适合触发记录的批处理?的主要内容,如果未能解决你的问题,请参考以下文章

kafka Streams会话窗口

是否可以使用 Kafka Streams 访问消息头?

Kafka Streams - 根据 Streams 数据发送不同的主题

带有Spring Cloud Stream的Kafka Streams进程中的Serd错误

Kafka Streams入门指南

为什么我不推荐Kafka Streams和KSQL?