在 KAFKA 中消费后删除消息
Posted
技术标签:
【中文标题】在 KAFKA 中消费后删除消息【英文标题】:Delete message after consuming it in KAFKA 【发布时间】:2015-04-19 14:17:29 【问题描述】:我正在使用 apache kafka 来生成和使用 5GB 大小的文件。我想知道是否有一种方法可以在使用后自动删除来自主题的消息。我有什么方法可以跟踪消费的消息吗?我不想手动删除它。
【问题讨论】:
如果您可以选择,可以使用传统的消息代理。 【参考方案1】:在 Kafka 中,消费的责任就是消费者的责任,这也是 Kafka 具有如此出色的横向可扩展性的主要原因之一。
使用高级消费者 API 将通过在 Zookeeper 中提交消耗的偏移量来自动为您执行此操作(或者,特殊的 Kafka 主题使用更新的配置选项来跟踪消耗的消息)。
简单的消费者 API 让您自己处理如何以及在何处跟踪所消费的消息。
Kafka 中的消息清除是通过指定主题的保留时间或为其定义磁盘配额来自动完成的,因此对于一个 5GB 文件的情况,该文件将在您定义的保留期过后被删除,不管它是否被消耗。
【讨论】:
如果你想消费一条消息,修改它并将其推送回不同的主题,一旦消息被消费就删除是有意义的。否则,您将在保留期内获得所有内容的 2 份副本。如果您可以对第一个主题设置保留期但删除已使用的消息,这是理想的选择。 你确定在报复政策到期后,即使消息没有被消费,数据也会从主题中删除吗?这意味着在使用给定分区的数据时,消费者会看到“漏洞”或丢失的消息。这不是破坏了 kafka 对可靠消息传递媒介的承诺吗?【参考方案2】:您不能在消费时删除 Kafka 消息
Kafka没有在消费时直接删除消息的机制。
我在尝试这样做时发现的最接近的东西是this trick,但它未经测试,并且根据设计它不适用于最近的消息:
这样做的一个潜在技巧是使用 (a) a 压缩主题和 (b) 自定义分区器 (c) 一对 拦截器。
流程如下:
在写入密钥之前,使用生产者拦截器将 GUID 添加到密钥的末尾。 使用自定义分区程序忽略 GUID 以进行分区 使用压缩主题,然后您可以通过 producer.send(key+GUID, null) 删除您需要的任何单个消息 使用消费者拦截器删除读取时的 GUID。
但您不应该需要此功能。
拥有 1 个或多个消费者,并且希望一条消息总共只被他们消费一次? 将它们放在同一个消费者组中。
想要避免过多的消息填满磁盘? 根据磁盘空间和/或时间设置保留。
【讨论】:
【参考方案3】:据我所知,您可以通过减少存储时间从日志中删除消耗的数据。日志的默认时间设置为 168 小时,然后数据会自动从您创建的 Kafka-Topic 中删除。因此,我的建议是减少对位于配置文件夹中的server.properties
的访问,并将 168 更改为最短时间。所以在您为 log.retention.hours 设置的特定时间后,它们就没有数据了。所以您的问题将得到解决。
log.retention.hours=168
继续编码
【讨论】:
这不是解决 OP 问题的方法。它将删除所有消息,无论它们是否已被使用。【参考方案4】:您可以使用 consumer_group :Kafka 保证一条消息只能被组中的单个消费者读取。 https://www.tutorialspoint.com/apache_kafka/apache_kafka_consumer_group_example.htm
【讨论】:
@RKRK 您能否为您的 cmets 添加更多信息?这对您来说可能很明显,但对于新用户来说,您认为他们应该如何改进他们的答案可能并不明显。【参考方案5】:我刚刚在这个问题中运行并构建了一个脚本,该脚本可以定期运行以将已使用的记录“标记”为已删除。 Kafka 不会立即释放空间,但会删除偏移量在“活动”分区之外的分区。
https://gist.github.com/ThePsyjo/b717d2eaca2deb09b8130b3e917758f6
【讨论】:
以上是关于在 KAFKA 中消费后删除消息的主要内容,如果未能解决你的问题,请参考以下文章
kafka 基础概念命令行操作(查看所有topic创建topic删除topic查看某个Topic的详情修改分区数发送消息消费消息 查看消费者组 更新消费者的偏移位置)
即使在成功连接并在 kafka 消费者控制台中获取消息后,也无法使用来自 kafka 主题的消息(使用 Python)[重复]