对使用新的 Kafka 幂等生产者 API 防止重复感到困惑

Posted

技术标签:

【中文标题】对使用新的 Kafka 幂等生产者 API 防止重复感到困惑【英文标题】:Confused about preventing duplicates with new Kafka idempotent producer API 【发布时间】:2018-08-14 00:07:05 【问题描述】:

我的应用程序有 5 个以上的消费者使用 kafka 主题的五个分区。(使用 kafka 版本 11)我的消费者每个都向另一个主题产生一条消息,然后将一些状态保存到数据库中,然后执行手动立即确认并移动到下一条消息。

我正在尝试解决他们成功发送到出站主题的场景。然后我们失败/失去了消费者。当另一个消费者接管分区时,它将向出站主题发出另一条消息。这很糟糕:(

我发现 kafka 现在有幂等生产者,但据我阅读,它只保证生产者会话。

“当生产者重新启动时,会分配新的 PID。因此,幂等性只承诺用于单个生产者会话”-(博客)-https://hevodata.com/blog/kafka-exactly-once

这对我来说似乎毫无用处。在我的用例中,重点是当我在另一个消费者上重播消息时,它不会复制出站消息。

我有什么遗漏吗?

【问题讨论】:

这里并不完全清楚你在哪里提交你的消费者抵消。在您确认生成的消息后,我会假设。这使您处于读取-处理-写入模式,您可能希望使用 Kafka 流或实现自己的事务生产者confluent.io/blog/transactions-apache-kafka 幂等生产者不适合这里。 感谢您的建议。明天我得再研究一下。似乎交易 id 很难实现 您不应该在使用事务时使用消费者来提交偏移量 - 请参阅我的回答。 【参考方案1】:

在使用事务时,您不应使用任何基于消费者的机制(手动或其他方式)来提交偏移量。

相反,您使用生产者将偏移量发送到事务,因此偏移量提交是事务的一部分。

如果配置了KafkaTransactionManagerChainedKafkaTransactionManager,Spring 侦听器容器将在侦听器正常退出时将偏移量发送到事务。

如果您不使用 Kafka 事务管理器,则需要使用 KafkaTemplate(或 Producer,如果您使用本机 API)将偏移量发送到事务。

使用消费者提交偏移量不是事务的一部分,所以事情不会按预期工作。

使用事务管理器时,侦听器容器将Producer 绑定到线程,因此任何下游KafkaTemplate 操作都参与消费者启动的事务。见the documentation。

【讨论】:

啊,我不知道交易会提交消费者和生产者的确认。

以上是关于对使用新的 Kafka 幂等生产者 API 防止重复感到困惑的主要内容,如果未能解决你的问题,请参考以下文章

如何使两个DC之间的kafka集群中的生产者幂等?

Kafka幂等性原理及实现剖析

kafka | 幂等生产者和事务生产者是一回事吗?

Kafka 学习kafka 生产者幂等性

一文读懂kafka的幂等生产者

Kafka快速入门(生产者)同步异步发送分区消息精确一次发送幂等性事务