Kafka 0.11 如何重置偏移量
Posted
技术标签:
【中文标题】Kafka 0.11 如何重置偏移量【英文标题】:Kafka 0.11 how to reset offsets 【发布时间】:2018-01-22 01:56:27 【问题描述】:我正在尝试使用最新的 Kafka CLI 工具重置消费者偏移量。
kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-earliest --all-topics
结果我看到了这个输出:
TOPIC PARTITION NEW-OFFSET
FirstTopic 0 0
SecondTopic 0 0
但是再次运行命令:
kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group my-group --describe
输出结果:
Consumer group 'my-group' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
FirstTopic 0 1230 1230 0
SecondTopic 0 1022 1022 0
我尝试了其他选项,例如重置为显式偏移或直接指定主题,但结果相同。输出提示操作成功,同时使用 describe 命令检查偏移量或调试显示偏移量未更改。
任何人都可以在非 Zookeeper 代理中成功重置消费者偏移量。
【问题讨论】:
【参考方案1】:默认情况下,--reset-offsets
只打印操作的结果。要实际执行操作,您需要将--execute
添加到您的命令中:
kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group
my-group --reset-offsets --to-earliest --all-topics --execute
【讨论】:
还有其他选项可以重置为特定的偏移量,比如上述问题中 FirstTopic 的 1200 吗?具体来说,我希望我的消费者重置到特定的时间点(无论当时的偏移量是多少) 我也想知道如何通过 Kafka Java API 来完成这些。 值得一提的是这个要点:gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b【参考方案2】:虽然接受的答案完美地回答了 OP 问题,但还有更多参数可用于重置偏移量。所以添加这个答案来扩展接受的答案。
将所有主题的偏移量重置为消费者组中最早的偏移量
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
<group_name> --reset-offsets --to-earliest --all-topics --execute
将特定主题的偏移量重置为消费组中最早的偏移量
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
<group_name> --reset-offsets --to-earliest --topic <my-topic> --execute
将特定主题的偏移量重置为消费者组中的特定偏移量
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
<group_name> --reset-offsets --to-offset 1000 --topic <my-topic> --execute
其他支持的参数:
--shift-by [正整数或负整数] - 从给定整数向前或向后移动偏移量。
--to-current 和 --to-latest 与 --to-offset 和 --to 相同- 最早的。
--to-datetime [日期时间格式为yyyy-MM-ddTHH:mm:ss.xxx]
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
<group_name> --reset-offsets --to-datetime 2017-08-04T00:00:00.000 [ --all-topics or --topic <topic-name> ] --execute
--by-duration [格式为 PnDTnHnMnS]
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
<group_name> --reset-offsets --by-duration PT0H10M0S [ --all-topics or --topic <topic-name> ] --execute
重置为从当前时间戳偏移持续时间。
如何验证?
使用下面的命令来检查当前/结束的偏移量并确认重置已完成。
kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> --describe
示例输出:
Consumer group 'group1' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
intro 0 0 99 99 - - -
可能的错误:
错误:只有在组“[group_name]”处于非活动状态但当前状态为稳定时才能重置分配。
'稳定'意味着,有一个活跃的消费者正在为这个组运行。所以首先你必须停止活跃的消费者并重试重置偏移量。
如果消费者组有活跃的消费者,则无法重置偏移量。
【讨论】:
,是否有任何官方文档可用?以上是关于Kafka 0.11 如何重置偏移量的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)