如何重置 Kafka 偏移量以匹配尾部位置?

Posted

技术标签:

【中文标题】如何重置 Kafka 偏移量以匹配尾部位置?【英文标题】:How to reset Kafka offsets to match tail position? 【发布时间】:2017-03-09 09:13:48 【问题描述】:

我们将 Storm 与 Kafka 和 ZooKeeper 一起使用。我们遇到过一种情况,我们必须删除一些主题并用不同的名称重新创建它们。除了现在读取新主题名称之外,我们的 Kafka spouts 保持不变。但是,现在 spout 在尝试读取新主题时正在使用旧主题分区的偏移量。所以 my-topic-name 分区 0 的尾部位置将为 500,但偏移量将类似于 10000。

有没有办法重置偏移位置以匹配主题的尾部?

【问题讨论】:

您的 kafka 偏移量存储在 Zookeeper 的 znode 中。所以你可以通过 zookeeper cli 重置它们,它们应该在 /consumers 下。您可以使用 kafka 元数据 API 读取它们。我一直在做一个项目,我们实际上覆盖了内置的偏移量机制并将偏移量存储在 HBase 中,并使用多版本来记录历史偏移量。我们基本上有一个偏移量的事务历史记录,我们可以通过删除每个消费者组/主题/分区组合的偏移量版本来回滚。 @dsch 仅适用于 0.9 之前的 Kafka 版本。由于 0.9 个偏移量存储在 Kafka 主题 __consumer_offsets 有趣。这种改变是为了通过 Mirrormaker 改进偏移复制吗? @dsch 新的偏移存储旨在克服 ZK 太忙的问题。 【参考方案1】:

有多个选项(因为 Storm 的 KafkaSpout 没有提供任何 API 来定义起始偏移量)。

    如果你想从日志尾部消费你应该删除旧的偏移量 取决于你的 Kafka 版本 (0.9 之前)您可以操纵 ZK(这有点棘手) (0.9+) 或者您尝试从主题 __consumer_offsets 中删除偏移量(这也很棘手,可能还会删除您想要保留的其他偏移量) 如果没有偏移,您可以使用自动偏移重置策略“最新”或“最大”(取决于您的 Kafka 版本)重新启动您的 spout 作为替代方案(我会推荐),您可以编写一个小型客户端应用程序,该应用程序使用seek() 以您需要的方式操作偏移量,并使用commit() 偏移量。此客户端必须使用与您KafkaSpout 相同的组 ID,并且必须订阅相同的主题。此外,您需要确保此客户端应用程序正在运行单个使用者组成员,以便获得分配的所有分区。 为此,您可以搜索到日志的末尾并提交 或者您提交了无效的偏移量(如 -1)并依赖自动偏移量重置配置“最新”或“最大”(取决于您的 Kafka 版本)

对于 Kafka Streams,有一个“应用程序重置工具”可以执行类似的操作来操作已提交的偏移量。如果你想了解一些细节,可以阅读这篇博文http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

(免责声明:我是这篇文章的作者,它是关于 Kafka Streams 的——尽管如此,基本的偏移量操作思想是相同的)

【讨论】:

以上是关于如何重置 Kafka 偏移量以匹配尾部位置?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 0.11 如何重置偏移量

SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)

如何计算偏移量以绘制到画布中应用了比例变换的最近像素?

Kafka指令重置偏移量

重置为 Kafka 分区中的自定义偏移量

kafka重置到最新offset偏移量