kafka防止消息重复消费

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka防止消息重复消费相关的知识,希望对你有一定的参考价值。

参考技术A kafka重复消费的根本原因就是“数据消费了,但是offset没更新”!而我们要探究一般什么情况下会导致offset没更新?

max.poll.interval.ms

两次poll操作允许的最大时间间隔。单位毫秒。默认值300000(5分钟)。

两次poll超过此时间间隔,Kafka服务端会进行rebalance操作,导致客户端连接失效,无法提交offset信息,从而引发重复消费。

拿到消息就提交offset

1、丢包问题 :消息推送服务,每天早上,手机上各终端都会给用户推送消息,这时候流量剧增,可能会出现kafka发送数据过快,导致服务器网卡爆满,或者磁盘处于繁忙状态,可能会出现丢包现象。

解决方案:首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功。 

检测方法:使用重放机制,查看问题所在。

2. 重复消费最常见的原因 :re-balance问题,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-balance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。 

消息重复消费和消息丢包的解决办法

保证不丢失消息:生产者(ack=all 代表至少成功发送一次)     重试机制

消费者 (offset手动提交,业务逻辑成功处理后,提交offset) 

保证不重复消费:落表(主键或者唯一索引的方式,避免重复数据) 

业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)

对使用新的 Kafka 幂等生产者 API 防止重复感到困惑

【中文标题】对使用新的 Kafka 幂等生产者 API 防止重复感到困惑【英文标题】:Confused about preventing duplicates with new Kafka idempotent producer API 【发布时间】:2018-08-14 00:07:05 【问题描述】:

我的应用程序有 5 个以上的消费者使用 kafka 主题的五个分区。(使用 kafka 版本 11)我的消费者每个都向另一个主题产生一条消息,然后将一些状态保存到数据库中,然后执行手动立即确认并移动到下一条消息。

我正在尝试解决他们成功发送到出站主题的场景。然后我们失败/失去了消费者。当另一个消费者接管分区时,它将向出站主题发出另一条消息。这很糟糕:(

我发现 kafka 现在有幂等生产者,但据我阅读,它只保证生产者会话。

“当生产者重新启动时,会分配新的 PID。因此,幂等性只承诺用于单个生产者会话”-(博客)-https://hevodata.com/blog/kafka-exactly-once

这对我来说似乎毫无用处。在我的用例中,重点是当我在另一个消费者上重播消息时,它不会复制出站消息。

我有什么遗漏吗?

【问题讨论】:

这里并不完全清楚你在哪里提交你的消费者抵消。在您确认生成的消息后,我会假设。这使您处于读取-处理-写入模式,您可能希望使用 Kafka 流或实现自己的事务生产者confluent.io/blog/transactions-apache-kafka 幂等生产者不适合这里。 感谢您的建议。明天我得再研究一下。似乎交易 id 很难实现 您不应该在使用事务时使用消费者来提交偏移量 - 请参阅我的回答。 【参考方案1】:

在使用事务时,您不应使用任何基于消费者的机制(手动或其他方式)来提交偏移量。

相反,您使用生产者将偏移量发送到事务,因此偏移量提交是事务的一部分。

如果配置了KafkaTransactionManagerChainedKafkaTransactionManager,Spring 侦听器容器将在侦听器正常退出时将偏移量发送到事务。

如果您不使用 Kafka 事务管理器,则需要使用 KafkaTemplate(或 Producer,如果您使用本机 API)将偏移量发送到事务。

使用消费者提交偏移量不是事务的一部分,所以事情不会按预期工作。

使用事务管理器时,侦听器容器将Producer 绑定到线程,因此任何下游KafkaTemplate 操作都参与消费者启动的事务。见the documentation。

【讨论】:

啊,我不知道交易会提交消费者和生产者的确认。

以上是关于kafka防止消息重复消费的主要内容,如果未能解决你的问题,请参考以下文章

SpringCloud(28)——Stream重复消费与持久化

kafka重复消费的问题

kafka重复消费的原因

kafka线上问题优化:消息丢失重复消费消息积压延时队列顺序消费

kafka线上问题优化:消息丢失重复消费消息积压延时队列顺序消费

消息队列怎么避免重复消费