如何重置 Kafka 偏移量以匹配尾部位置?
Posted
技术标签:
【中文标题】如何重置 Kafka 偏移量以匹配尾部位置?【英文标题】:How to reset Kafka offsets to match tail position? 【发布时间】:2017-03-09 09:13:48 【问题描述】:我们将 Storm 与 Kafka 和 ZooKeeper 一起使用。我们遇到过一种情况,我们必须删除一些主题并用不同的名称重新创建它们。除了现在读取新主题名称之外,我们的 Kafka spouts 保持不变。但是,现在 spout 在尝试读取新主题时正在使用旧主题分区的偏移量。所以 my-topic-name 分区 0 的尾部位置将为 500,但偏移量将类似于 10000。
有没有办法重置偏移位置以匹配主题的尾部?
【问题讨论】:
您的 kafka 偏移量存储在 Zookeeper 的 znode 中。所以你可以通过 zookeeper cli 重置它们,它们应该在 /consumers 下。您可以使用 kafka 元数据 API 读取它们。我一直在做一个项目,我们实际上覆盖了内置的偏移量机制并将偏移量存储在 HBase 中,并使用多版本来记录历史偏移量。我们基本上有一个偏移量的事务历史记录,我们可以通过删除每个消费者组/主题/分区组合的偏移量版本来回滚。 @dsch 仅适用于 0.9 之前的 Kafka 版本。由于 0.9 个偏移量存储在 Kafka 主题__consumer_offsets
有趣。这种改变是为了通过 Mirrormaker 改进偏移复制吗?
@dsch 新的偏移存储旨在克服 ZK 太忙的问题。
【参考方案1】:
有多个选项(因为 Storm 的 KafkaSpout
没有提供任何 API 来定义起始偏移量)。
-
如果你想从日志尾部消费你应该删除旧的偏移量
取决于你的 Kafka 版本
(0.9 之前)您可以操纵 ZK(这有点棘手)
(0.9+) 或者您尝试从主题
__consumer_offsets
中删除偏移量(这也很棘手,可能还会删除您想要保留的其他偏移量)
如果没有偏移,您可以使用自动偏移重置策略“最新”或“最大”(取决于您的 Kafka 版本)重新启动您的 spout
作为替代方案(我会推荐),您可以编写一个小型客户端应用程序,该应用程序使用seek()
以您需要的方式操作偏移量,并使用commit()
偏移量。此客户端必须使用与您KafkaSpout
相同的组 ID,并且必须订阅相同的主题。此外,您需要确保此客户端应用程序正在运行单个使用者组成员,以便获得分配的所有分区。
为此,您可以搜索到日志的末尾并提交
或者您提交了无效的偏移量(如 -1)并依赖自动偏移量重置配置“最新”或“最大”(取决于您的 Kafka 版本)
对于 Kafka Streams,有一个“应用程序重置工具”可以执行类似的操作来操作已提交的偏移量。如果你想了解一些细节,可以阅读这篇博文http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/
(免责声明:我是这篇文章的作者,它是关于 Kafka Streams 的——尽管如此,基本的偏移量操作思想是相同的)
【讨论】:
以上是关于如何重置 Kafka 偏移量以匹配尾部位置?的主要内容,如果未能解决你的问题,请参考以下文章