kafka防止消息重复消费
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka防止消息重复消费相关的知识,希望对你有一定的参考价值。
参考技术A kafka重复消费的根本原因就是“数据消费了,但是offset没更新”!而我们要探究一般什么情况下会导致offset没更新?max.poll.interval.ms
两次poll操作允许的最大时间间隔。单位毫秒。默认值300000(5分钟)。
两次poll超过此时间间隔,Kafka服务端会进行rebalance操作,导致客户端连接失效,无法提交offset信息,从而引发重复消费。
拿到消息就提交offset
1、丢包问题 :消息推送服务,每天早上,手机上各终端都会给用户推送消息,这时候流量剧增,可能会出现kafka发送数据过快,导致服务器网卡爆满,或者磁盘处于繁忙状态,可能会出现丢包现象。
解决方案:首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功。
检测方法:使用重放机制,查看问题所在。
2. 重复消费最常见的原因 :re-balance问题,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-balance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
消息重复消费和消息丢包的解决办法
保证不丢失消息:生产者(ack=all 代表至少成功发送一次) 重试机制
消费者 (offset手动提交,业务逻辑成功处理后,提交offset)
保证不重复消费:落表(主键或者唯一索引的方式,避免重复数据)
业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)
对使用新的 Kafka 幂等生产者 API 防止重复感到困惑
【中文标题】对使用新的 Kafka 幂等生产者 API 防止重复感到困惑【英文标题】:Confused about preventing duplicates with new Kafka idempotent producer API 【发布时间】:2018-08-14 00:07:05 【问题描述】:我的应用程序有 5 个以上的消费者使用 kafka 主题的五个分区。(使用 kafka 版本 11)我的消费者每个都向另一个主题产生一条消息,然后将一些状态保存到数据库中,然后执行手动立即确认并移动到下一条消息。
我正在尝试解决他们成功发送到出站主题的场景。然后我们失败/失去了消费者。当另一个消费者接管分区时,它将向出站主题发出另一条消息。这很糟糕:(
我发现 kafka 现在有幂等生产者,但据我阅读,它只保证生产者会话。
“当生产者重新启动时,会分配新的 PID。因此,幂等性只承诺用于单个生产者会话”-(博客)-https://hevodata.com/blog/kafka-exactly-once
这对我来说似乎毫无用处。在我的用例中,重点是当我在另一个消费者上重播消息时,它不会复制出站消息。
我有什么遗漏吗?
【问题讨论】:
这里并不完全清楚你在哪里提交你的消费者抵消。在您确认生成的消息后,我会假设。这使您处于读取-处理-写入模式,您可能希望使用 Kafka 流或实现自己的事务生产者confluent.io/blog/transactions-apache-kafka 幂等生产者不适合这里。 感谢您的建议。明天我得再研究一下。似乎交易 id 很难实现 您不应该在使用事务时使用消费者来提交偏移量 - 请参阅我的回答。 【参考方案1】:在使用事务时,您不应使用任何基于消费者的机制(手动或其他方式)来提交偏移量。
相反,您使用生产者将偏移量发送到事务,因此偏移量提交是事务的一部分。
如果配置了KafkaTransactionManager
或ChainedKafkaTransactionManager
,Spring 侦听器容器将在侦听器正常退出时将偏移量发送到事务。
如果您不使用 Kafka 事务管理器,则需要使用 KafkaTemplate
(或 Producer
,如果您使用本机 API)将偏移量发送到事务。
使用消费者提交偏移量不是事务的一部分,所以事情不会按预期工作。
使用事务管理器时,侦听器容器将Producer
绑定到线程,因此任何下游KafkaTemplate
操作都参与消费者启动的事务。见the documentation。
【讨论】:
啊,我不知道交易会提交消费者和生产者的确认。以上是关于kafka防止消息重复消费的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud(28)——Stream重复消费与持久化
kafka线上问题优化:消息丢失重复消费消息积压延时队列顺序消费