避免apache kafka消费者重复消息的有效策略
Posted
技术标签:
【中文标题】避免apache kafka消费者重复消息的有效策略【英文标题】:Effective strategy to avoid duplicate messages in apache kafka consumer 【发布时间】:2015-06-21 06:17:33 【问题描述】:我已经学习 apache kafka 一个月了。然而,我现在陷入了困境。我的用例是,我有两个或多个消费者进程在不同的机器上运行。我进行了一些测试,其中我在 kafka 服务器中发布了 10,000 条消息。然后在处理这些消息时,我杀死了一个消费者进程并重新启动它。消费者正在将处理后的消息写入文件。所以消费完成后,文件显示超过 10k 条消息。所以有些消息是重复的。
在消费者进程中,我禁用了自动提交。消费者手动批量提交偏移量。因此,例如,如果将 100 条消息写入文件,消费者会提交偏移量。当单个消费者进程正在运行并且它崩溃并恢复时,以这种方式避免了重复。但是当多个消费者在运行并且其中一个崩溃并恢复时,它会将重复的消息写入文件。
是否有任何有效的策略来避免这些重复消息?
【问题讨论】:
我看不到在单一消费者案例中如何避免重复问题。你能帮我理解吗? 【参考方案1】:简短的回答是,不。
您正在寻找的是一次性处理。虽然它通常看起来可行,但永远不应该依赖它,因为总是有警告。
即使为了防止重复,您也需要使用简单消费者。这种方法的工作原理是针对每个消费者,当从某个分区消费消息时,将消费消息的分区和偏移量写入磁盘。当消费者在失败后重启时,从磁盘读取每个分区的最后消费偏移量。
但即使使用这种模式,消费者也不能保证它不会在失败后重新处理消息。如果消费者消费了一条消息,然后在偏移量刷新到磁盘之前失败了怎么办?如果在处理消息之前写入磁盘,如果在实际处理消息之前写入偏移量然后失败怎么办?即使您在每条消息后向 ZooKeeper 提交偏移量,也会存在同样的问题。
但在某些情况下, 精确一次处理更容易实现,但仅适用于某些用例。这只需要将您的偏移量存储在与单元应用程序的输出相同的位置。例如,如果您编写一个对消息进行计数的消费者,通过将最后计数的偏移量与每个计数一起存储,您可以保证该偏移量与消费者的状态同时存储。当然,为了保证一次性处理,这将要求您只使用一条消息并为每条消息只更新一次状态,这对于大多数 Kafka 消费者应用程序来说是完全不切实际的。出于性能原因,Kafka 本质上会批量消费消息。
如果您简单地将其设计为幂等性,通常您的时间会花得更多,您的应用程序也会更加可靠。
【讨论】:
与启用自动提交相比,我们使用此"exactly-once scenario"
获得的真正好处是什么?在什么场景和情况下,这将有所帮助。在我的情况下,我将有多个消费者在不同的机器上运行,消耗来自具有多个分区的同一主题的数据,我想消除丢失消息的可能性,并减少重新平衡期间重复的消息数量.
在我的情况下接收重复消息是可以的,因为我的系统可以处理它,但我根本不能丢失数据,所以想看看这种方法是否会通过手动管理磁盘或磁盘上的偏移量来带来任何好处在某个数据库上。【参考方案2】:
这就是Kafka FAQ 关于exactly-once 的话题:
如何从 Kafka 获取一次性消息?
Exactly once 语义有两部分:在数据生产过程中避免重复和在数据消费过程中避免重复。
有两种方法可以在数据生产期间获得恰好一次的语义:
每个分区使用一个写入器,每次遇到网络错误时,检查该分区中的最后一条消息,看看您的最后一次写入是否成功 在消息中包含主键(UUID 或其他内容)并对使用者进行重复数据删除。如果你做这些事情之一,Kafka 托管的日志将是无重复的。然而,没有重复的阅读也取决于消费者的一些合作。如果消费者定期检查其位置,那么如果它失败并重新启动,它将从检查点位置重新启动。因此,如果数据输出和检查点不是原子写入的,那么这里也可能会出现重复。此问题是您的存储系统所特有的。例如,如果您正在使用数据库,您可以在事务中一起提交这些。 LinkedIn 编写的 HDFS 加载器 Camus 为 Hadoop 加载做了类似的事情。另一个不需要事务的替代方法是使用加载的数据存储偏移量,并使用主题/分区/偏移量组合进行重复数据删除。
我认为有两个改进可以让这更容易:
生产者幂等性可以通过选择性地在服务器上集成对此的支持来自动完成,而且成本更低。 现有的高级消费者没有公开很多更细粒度的偏移控制(例如,重置您的位置)。我们会尽快解决这个问题
【讨论】:
在我的情况下接收重复消息是可以的,因为我的系统可以处理它,但我根本不能丢失数据,所以想看看这种方法是否会通过手动管理磁盘或磁盘上的偏移量来带来任何好处在某个数据库上。【参考方案3】:我同意 RaGe 在消费者方面的重复数据删除。我们使用 Redis 对 Kafka 消息进行重复数据删除。
假设 Message 类有一个名为 'uniqId' 的成员,由生产者端填充,保证唯一。我们使用 12 长度的随机字符串。 (正则表达式为'^[A-Za-z0-9]12$'
)
消费者端使用 Redis 的 SETNX 进行重复数据删除和 EXPIRE 自动清除过期密钥。示例代码:
Message msg = ... // eg. ConsumerIterator.next().message().fromJson();
Jedis jedis = ... // eg. JedisPool.getResource();
String key = "SPOUT:" + msg.uniqId; // prefix name at will
String val = Long.toString(System.currentTimeMillis());
long rsps = jedis.setnx(key, val);
if (rsps <= 0)
log.warn("kafka dup: ", msg.toJson()); // and other logic
else
jedis.expire(key, 7200); // 2 hours is ok for production environment;
上述代码确实在Kafka(0.8.x版)出现情况时多次检测到重复消息。通过我们的输入/输出平衡审计日志,没有发生任何消息丢失或重复。
【讨论】:
这在重试的情况下不起作用,您将它们视为重复数据删除,而它们应该重试 如果在 jedis.setnx() 命令之后,消费者在完成其处理任务之前崩溃/网络故障等怎么办?我想我们应该冒这个小风险? user1955934 可以使用Redis事务。【参考方案4】:Kafka 现在有一个相对较新的“Transactional API”,可以让您在处理流时实现仅一次处理。使用事务 API,只要系统的其余部分是为幂等设计的,就可以内置幂等性。见https://www.baeldung.com/kafka-exactly-once
【讨论】:
这仅适用于生产者使用事务 API 的情况,否则消费者无法从该模式中受益。【参考方案5】:无论在生产者端做了什么,我们认为从 kafka 只交付一次的最佳方式仍然是在消费者端处理它:
-
使用 uuid 作为主题 T1 的 Kafka 消息密钥生成 msg
消费者端从 T1 读取 msg,以 uuid 作为 rowkey 写入 hbase
使用相同的 rowkey 从 hbase 读回并写入另一个主题 T2
让您的最终消费者从主题 T2 实际消费
【讨论】:
以上是关于避免apache kafka消费者重复消息的有效策略的主要内容,如果未能解决你的问题,请参考以下文章
Kafka在高并发的情况下,如何避免消息丢失和消息重复?kafka消费怎么保证数据消费一次?数据的一致性和统一性?数据的完整性?