Kafka Streams 容错与并行偏移管理

Posted

技术标签:

【中文标题】Kafka Streams 容错与并行偏移管理【英文标题】:Kafka Streams Fault Tolerance with Offset Management in Parellel 【发布时间】:2021-03-05 22:44:31 【问题描述】:

说明

我有一个使用主题的 Kafka Stream 应用程序。 活动数量众多。

KafkaStream 会将这些事件作为终端操作使用,并将这些事件组合成一堆,比如 1000 个事件,然后将其写入 AWS S3。

我有线程在消费来自 Kafka 主题的事件后并行写入 s3。

由于某些业务应用程序逻辑和处理,不使用 kafka-connector-s3。

问题 ::

我希望应用程序具有容错性,不想丢失消息。

--> 崩溃场景

假设应用程序有 10 个线程都在运行并试图将事件放入 S3,并且在这种情况下发生崩溃,因为 KafkaStream 具有 (enable.auto.commit = false),我们无法手动提交偏移量并且所有线程都消费了来自 Kafka 主题的消息。 在这种情况下,KafkaStreams 已经在读取后提交了偏移量,但它无法将事件处理到 S3。

我需要一种机制,以便我可以确定在事件成功写入 S3 文件之前的最后偏移量是多少。 在崩溃场景中,我应该如何处理这个问题以及如何管理 Kafka Streams 中的 Kafka 偏移量,因为我使用了 10 个线程。如果有些未能写入 s3 而有些则通过了怎么办。如何确保偏移量的排序成功处理到 s3?

如果我不清楚我的问题陈述,请告诉我。

谢谢!

【问题讨论】:

【参考方案1】:

我可以向您保证,enable.auto.commit 在 Kafka Streams 中设置为 false。位于 https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsConfig.html 状态的 Javadocs

"enable.auto.commit" (false) - Streams 客户端将始终禁用/关闭自动提交

你说得对,Kafka Streams 会或多或少地定期自动提交。但是,Kafka Streams 会等到记录处理完毕后再提交相应的偏移量。这意味着您至少会得到至少一次保证并且不会丢失消息。

据我了解您的应用程序,在将记录发送到 S3 之前,您的终端处理器不会阻塞。这意味着,Kafka Streams 无法知道发送何时完成。 Kafka Streams 只是看到终端处理器完成了它的处理,然后——如果提交间隔已过——它会提交偏移量。

你说

由于某些业务应用程序逻辑和处理,不使用 kafka-connector-s3。

你能不能把业务应用逻辑放到Kafka Streams应用中,将结果写入带有操作符to()的Kafka主题,然后使用kafka-connector-s3将该主题中的消息发送到S3? 我不是连接专家,但我想这可以确保消息不会丢失,并使您的实现更简单。

【讨论】:

嗨布鲁诺,谢谢。在我的用例中,我实际上想使用来自 kafka 的消息并制作一堆说 5000 条消息并将其写入 s3,因为写入 s3 是一项耗时的操作。现在,如果我开始从流中消费直到 5000 条消息,然后执行 S3 写入,偏移量将被提交,直到 5000 条消息并且如果 s3 写入失败。我正在考虑使用 Kafka Consumer,因为它支持手动提交偏移量。我相信 kafka 流最适合你想读写 kafka 主题时,但这里我只是在阅读和做终端操作。 您对 kafka 流与 kafka 消费者有何看法。?? 如果你想对偏移提交有最大的控制,你应该使用 Kafka 消费者。请注意,在 Kafka Streams 中,如果您使用可以访问ProcessorContext 的操作,例如process()transform(),您也可以请求提交。使用commit.interval.ms,您可以控制 Kafka Streams 自动完成的提交间隔。 关于使用多个线程写入 S3,我会考虑通过使用更多流线程和/或并行运行更多 Kafka Streams 客户端(或更多 Kafka 消费者)来扩展输入主题的分区数量)。如果您的输入分区的分区很少,您可以先将输入主题的消息写入具有更多分区的主题,然后从该主题中读取并按分区号进行扩展。 您还没有回答我的问题,即是否可以先使用 Kafka Streams 来应用业务逻辑,将消息写入主题,然后使用 S3 连接器。如果它不是一个选项,我会感兴趣为什么它不是一个选项?使用 Kafka Streams 和 Connect 可以减轻您在发生故障时实现偏移管理等低级功能的负担。【参考方案2】:

使用 kafka-stream ,您可以将来自源主题的 5000 条消息聚合到一个大消息中,并将大消息发送到另一个主题,例如 middle_topic。您需要来自 middle_topic 的另一个 proceccor 源并使用 s3-connector 接收到 s3。

【讨论】:

以上是关于Kafka Streams 容错与并行偏移管理的主要内容,如果未能解决你的问题,请参考以下文章

kafka系列之(3)——Coordinator与offset管理和Consumer Rebalance

使用Kafka Streams处理复杂的Avro消息

Kafka Connect 如何构建实时数据管道

Spark Streaming Kafka 偏移量 Offset 管理

Akka Stream Kafka vs Kafka Streams

Kafka streams概览