使用 Kafka Streams DSL 时如何处理错误和不提交

Posted

技术标签:

【中文标题】使用 Kafka Streams DSL 时如何处理错误和不提交【英文标题】:How to handle error and don't commit when use Kafka Streams DSL 【发布时间】:2017-06-22 06:55:48 【问题描述】:

对于 Kafka Streams,如果我们使用较低级别的处理器 API,我们可以控制是否提交。因此,如果我们的代码中出现问题,并且我们不想提交此消息。在这种情况下,Kafka 将多次重新传递此消息,直到问题得到解决。

但是在使用其更高级别的流DSL API时如何控制是否提交消息呢?

资源:

http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html

【问题讨论】:

您链接到的文档非常陈旧且过时。更好地使用docs.confluent.io/current/streams/index.html 【参考方案1】:

你的说法并不完全正确。您不能“控制是否提交”——至少不能直接控制(无论是在处理器 API 中还是在 DSL 中)。您只能使用ProcessorContext#commit() 来请求附加 提交。因此,在调用#commit() 之后,Streams 会尝试尽快提交,但这不是立即提交。此外,即使您从未调用过#commit(),Streams 也会自动提交。您可以通过 Streams 配置 commit.interval.m 控制 Streams 提交间隔(参见 http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application)

如果出现“问题”,这取决于您遇到的问题类型:

如果您检测到无法恢复的问题,您只能抛出异常并“停止世界”(参见下文)。 如果您有一个可恢复的错误,您需要在您自己的代码中“循环”(例如,在Processor#process()KeyValueMapper#apply() 中,直到问题得到解决并且您可以成功处理当前消息(注意,您可能使用此策略遇到超时,即异常 - 参见消费者配置 heartbeat.interval.ms 和 0.10.1 session.timeout.ms [KIP-62]) 另一种方法是将现在无法处理的记录放入StateStore 并稍后处理它们。但是,很难做到正确,并且还破坏了一些 Streams 假设(例如,处理顺序)。不建议使用,如果使用了,一定要非常小心的含义

如果有未捕获的异常StreamThread 将死亡并且不会发生任何提交(您可以注册一个异常处理程序以获取有关此的通知:http://docs.confluent.io/current/streams/developer-guide.html#using-kafka-streams-within-your-application-code。如果你们所有人 StreamThread 死亡,您将需要创建一个新的 KafkaStreams 实例来重新启动您的应用程序。

在消息被成功处理之前,您不能从用户代码返回,因为如果您返回,Streams 会假定消息已成功处理(因此可能会提交相应的偏移量)。关于要点 (3),将记录放入特殊的 StateStore 以供以后处理被视为“成功”处理的记录。

【讨论】:

非常感谢,马蒂亚斯。你们这些cmets让我对Kafka的内部有了更多的了解。而对于可恢复的错误(例如,当将数据写入此时已关闭的目标服务器时),我们选择永远重试,直到它得到修复。这会阻止消费者,但在我们的情况下没问题。 您能否提供一些有关 KafkaStreams 如何处理偏移提交的文档的链接。听起来您是说在用户代码完成之前不会提交偏移量,并且如果用户代码抛出错误则不会提交。对吗? 你说的是对的。不幸的是,没有可用的文档(只有代码……如果您想深入研究,请查看StreamsThread#maybeCommit() 方法)。 您可以将 commit.internal.ms 配置设置为 Long.MAX_VALUE - 这将有效地避免 Kafka Streams 自动提交,但仅在您调用 context#commit() 之后。 KS 默认将enable.auto.commit 设置为false。但是,KS 有它自己的提交逻辑(即,它在每个 commit.interval.ms 内部“手动”调用 Consumer#commit()):因此,从消费者的角度来看,我们使用手动提交,但是,从 Kafka Streams 的角度来看查看,它仍然是自动的,并且没有 Kafka Streams 配置来禁用它。

以上是关于使用 Kafka Streams DSL 时如何处理错误和不提交的主要内容,如果未能解决你的问题,请参考以下文章

KSQL / Kafka Streams可以支持复杂事件处理吗?

如何限制kafka-streams中的rocksdb内存使用量

如何改善响应式 kafka(Scala 加 Akka Streams)的缓慢性能?

Kafka Streams 在 HDFS 上查找数据

为啥 Apache Kafka Streams 使用 RocksDB 以及如何改变它?

kafka Streams会话窗口