对使用新的 Kafka 幂等生产者 API 防止重复感到困惑
Posted
技术标签:
【中文标题】对使用新的 Kafka 幂等生产者 API 防止重复感到困惑【英文标题】:Confused about preventing duplicates with new Kafka idempotent producer API 【发布时间】:2018-08-14 00:07:05 【问题描述】:我的应用程序有 5 个以上的消费者使用 kafka 主题的五个分区。(使用 kafka 版本 11)我的消费者每个都向另一个主题产生一条消息,然后将一些状态保存到数据库中,然后执行手动立即确认并移动到下一条消息。
我正在尝试解决他们成功发送到出站主题的场景。然后我们失败/失去了消费者。当另一个消费者接管分区时,它将向出站主题发出另一条消息。这很糟糕:(
我发现 kafka 现在有幂等生产者,但据我阅读,它只保证生产者会话。
“当生产者重新启动时,会分配新的 PID。因此,幂等性只承诺用于单个生产者会话”-(博客)-https://hevodata.com/blog/kafka-exactly-once
这对我来说似乎毫无用处。在我的用例中,重点是当我在另一个消费者上重播消息时,它不会复制出站消息。
我有什么遗漏吗?
【问题讨论】:
这里并不完全清楚你在哪里提交你的消费者抵消。在您确认生成的消息后,我会假设。这使您处于读取-处理-写入模式,您可能希望使用 Kafka 流或实现自己的事务生产者confluent.io/blog/transactions-apache-kafka 幂等生产者不适合这里。 感谢您的建议。明天我得再研究一下。似乎交易 id 很难实现 您不应该在使用事务时使用消费者来提交偏移量 - 请参阅我的回答。 【参考方案1】:在使用事务时,您不应使用任何基于消费者的机制(手动或其他方式)来提交偏移量。
相反,您使用生产者将偏移量发送到事务,因此偏移量提交是事务的一部分。
如果配置了KafkaTransactionManager
或ChainedKafkaTransactionManager
,Spring 侦听器容器将在侦听器正常退出时将偏移量发送到事务。
如果您不使用 Kafka 事务管理器,则需要使用 KafkaTemplate
(或 Producer
,如果您使用本机 API)将偏移量发送到事务。
使用消费者提交偏移量不是事务的一部分,所以事情不会按预期工作。
使用事务管理器时,侦听器容器将Producer
绑定到线程,因此任何下游KafkaTemplate
操作都参与消费者启动的事务。见the documentation。
【讨论】:
啊,我不知道交易会提交消费者和生产者的确认。以上是关于对使用新的 Kafka 幂等生产者 API 防止重复感到困惑的主要内容,如果未能解决你的问题,请参考以下文章