Spring Kafka Auto Commit Offset 在失败的情况下
Posted
技术标签:
【中文标题】Spring Kafka Auto Commit Offset 在失败的情况下【英文标题】:Spring Kafka Auto Commit Offset In Case of Failures 【发布时间】:2018-08-30 10:40:03 【问题描述】:我正在使用 Spring Kafka 1.2.2.RELEASE。我有一个 Kafka 侦听器作为消费者,它侦听一个主题并以弹性方式索引文档。 我的自动提交偏移属性设置为 true //default。
我的印象是,如果侦听器中出现异常(弹性已关闭),则不应提交偏移量,并且应为下一次轮询处理相同的消息
然而这并没有发生,消费者在下一次投票时提交了偏移量。在阅读了帖子和文档后,我了解到在自动提交设置为 true 到下一次投票将提交所有偏移量的情况下
我的疑问是为什么消费者会调用下一次民意调查,以及如何防止任何偏移量通过自动提交为 true 提交,或者我是否需要将此属性设置为 false 并手动提交。
【问题讨论】:
【参考方案1】:我更喜欢将其设置为 false;容器为您管理偏移量更可靠。
将容器的AckMode
设置为RECORD
(默认为BATCH
),监听器返回后容器会为你提交偏移量。
还要考虑至少升级到 1.3.3(当前版本是 2.1.4);感谢KIP-62
,1.3.x 引入了一个更简单的线程模型编辑
使用自动提交,无论成功/失败,都会提交偏移量。容器在失败后不会提交,除非 ackOnError
为真(另一个不使用自动提交的原因)。
但是,这仍然无济于事,因为代理不会再次发送相同的记录。为此,您必须对 Consumer
执行搜索操作。
在 2.0.1(当前版本为 2.1.4)中,我们添加了SeekToCurrentErrorHandler
,这将导致在下次轮询时重新发送失败和未处理的记录。 See the reference manual.
您也可以使用ConsumerAwareListener
自己执行搜索(也在 2.0 中添加)。
对于旧版本 (>= 1.1),您必须使用 ConsumerSeekAware
侦听器 is quite a bit more complicated。
另一种选择是添加retry,以便根据重试设置重新尝试传递。
【讨论】:
谢谢加里。但这是预期的行为吗,据我了解,不应提交记录,即使自动提交为真,也会重播。 查看我的答案的编辑;提交无关紧要。 关于spring kafka的升级——我们在生产中使用kafka客户端0.10.2,支持kafka客户端0.10.2的最新spring kafka版本是1.2.x 我现在正在手动提交。但是即使将 ackOnError 设置为 false 代理也不会再次发送相同的记录,那么将 ackOnError 设置为 false 有什么意义呢?我是否错过了 ackOnError 会派上用场的特定用户案例。ackOnError=false
通常在容器发生故障后停止时使用。这样,当容器重新启动时,失败的消息会重新传递。我相信从 0.10.2.0 开始,新客户可以与老经纪人交谈,只要您不尝试使用新功能。我们现在有了SeekToCurrentErrorHandler
,它可以避免停止容器。不过,我建议升级您的经纪人。【参考方案2】:
显然,Spring Kafka
即使使用 spring-kafka 1.3.3.RELEASE(无 maven 源)和单个分区主题、concurrency(1)、AckOnError(false)、BatchListener(true),我们也能够在消费者上重现消息丢失/跳过) 使用 AckMode(BATCH) 处理任何运行时异常。我们最终在模板内重试或探索 ConsumerSeekAware。
@GaryRussell,关于“代理不会再次发送相同的记录”或继续返回下一批消息而不提交?这是因为,消费者轮询是基于它寻求获取下一批记录的当前偏移量而不是恰好在最后一次提交的偏移量上?基本上,消费者根本不需要在每次处理时假设一些运行时异常并继续消费主题上的整个消息。只有重新启动才会从上次提交的偏移量(重复)开始。
升级到 2.0+ 以使用 ConsumerAwareListenerErrorHandler 似乎需要至少升级到 Spring 5.x,这是一个重大升级。
【讨论】:
我们无法升级 Spring kafka(1.2.2),因为它还需要将 Kafka 客户端升级(当前为 0.10.2)到最低 0.11,是的,我同样怀疑消费者是否要投票无论之前的记录是否成功处理,那么提交是否仅在重启的情况下才相关? Spring kafka 1.3.3 兼容 0.10。较新的 Spring Kafka 版本允许客户端完全访问消费者以更新偏移量等,但是,一般来说,鉴于客户端代码可以继续抛出它,Spring 库继续重试运行时异常是没有意义的。我们为 DB/IO 异常添加了重试,其余部分只是使用错误处理程序记录到文件/DB 流中以继续处理其余部分。是的,然后可以通过这种方法在 try/finally 中使用 Ack 来控制提交。或者在批处理/记录上使用自动提交方法让 spring 自动提交。以上是关于Spring Kafka Auto Commit Offset 在失败的情况下的主要内容,如果未能解决你的问题,请参考以下文章