Kafka 0.11 如何重置偏移量

Posted

技术标签:

【中文标题】Kafka 0.11 如何重置偏移量【英文标题】:Kafka 0.11 how to reset offsets 【发布时间】:2018-01-22 01:56:27 【问题描述】:

我正在尝试使用最新的 Kafka CLI 工具重置消费者偏移量。

kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-earliest --all-topics

结果我看到了这个输出:

TOPIC                            PARTITION  NEW-OFFSET
FirstTopic                       0          0
SecondTopic                      0          0

但是再次运行命令:

kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group my-group --describe

输出结果:

Consumer group 'my-group' has no active members.

TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
FirstTopic         0          1230            1230            0  
SecondTopic        0          1022            1022            0

我尝试了其他选项,例如重置为显式偏移或直接指定主题,但结果相同。输出提示操作成功,同时使用 describe 命令检查偏移量或调试显示偏移量未更改。

任何人都可以在非 Zookeeper 代理中成功重置消费者偏移量。

【问题讨论】:

【参考方案1】:

默认情况下,--reset-offsets 只打印操作的结果。要实际执行操作,您需要将--execute 添加到您的命令中:

kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group
my-group --reset-offsets --to-earliest --all-topics --execute

【讨论】:

还有其他选项可以重置为特定的偏移量,比如上述问题中 FirstTopic 的 1200 吗?具体来说,我希望我的消费者重置到特定的时间点(无论当时的偏移量是多少) 我也想知道如何通过 Kafka Java API 来完成这些。 值得一提的是这个要点:gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b【参考方案2】:

虽然接受的答案完美地回答了 OP 问题,但还有更多参数可用于重置偏移量。所以添加这个答案来扩展接受的答案。

将所有主题的偏移量重置为消费者组中最早的偏移量

kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
    <group_name> --reset-offsets --to-earliest --all-topics --execute

将特定主题的偏移量重置为消费组中最早的偏移量

kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
    <group_name> --reset-offsets --to-earliest --topic <my-topic> --execute

将特定主题的偏移量重置为消费者组中的特定偏移量

kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
    <group_name> --reset-offsets --to-offset 1000 --topic <my-topic> --execute

其他支持的参数:

--shift-by [正整数或负整数] - 从给定整数向前或向后移动偏移量。

--to-current--to-latest--to-offset--to 相同- 最早的

--to-datetime [日期时间格式为yyyy-MM-ddTHH:mm:ss.xxx]

kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
    <group_name> --reset-offsets --to-datetime 2017-08-04T00:00:00.000 [ --all-topics or --topic <topic-name> ] --execute

--by-duration [格式为 PnDTnHnMnS]

kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
    <group_name> --reset-offsets --by-duration PT0H10M0S [ --all-topics or --topic <topic-name> ] --execute

重置为从当前时间戳偏移持续时间。

如何验证?

使用下面的命令来检查当前/结束的偏移量并确认重置已完成。

kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> --describe

示例输出:

Consumer group 'group1' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
intro           0          0               99              99              -               -               -

可能的错误:

错误:只有在组“[group_name]”处于非活动状态但当前状态为稳定时才能重置分配。

'稳定'意味着,有一个活跃的消费者正在为这个组运行。所以首先你必须停止活跃的消费者并重试重置偏移量。

如果消费者组有活跃的消费者,则无法重置偏移量。

【讨论】:

,是否有任何官方文档可用?

以上是关于Kafka 0.11 如何重置偏移量的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)

重置为 Kafka 分区中的自定义偏移量

Kafka指令重置偏移量

kafka重置到最新offset偏移量

Apache Kafka:使用java方式操作消费组和重置分区偏移量(admin api)

golang kafka Shopify/sarama 消费者重置新增分区偏移量并进行重新消费