kafka重置到最新offset偏移量
Posted 七调
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka重置到最新offset偏移量相关的知识,希望对你有一定的参考价值。
小弟近日用kafka测试传输数据设置的单消费者,不料消费者头天晚上就挂掉了 ,重启消费者,因为auto.offset.reset 默认为latest,所以消费者从昨天晚上的数据接着消费,因为差了一晚上了,消费者一时半会追不上生产者的步伐,而我又需要实时展示数据,且又不能每次重启消费者重新赋予group.id。所以需要手动修改偏移量到最新。
最后通过以下代码解决问题
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
…………
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition partition0 = new TopicPartition("topicName", 0);
consumer.assign(Arrays.asList(partition0,partition1,partition2));
// consumer.seek(partition0, 220);
consumer.seekToEnd(Arrays.asList(partition0));
}
解决!
参考了三度微凉的博客:https://blog.csdn.net/yu280265067/article/details/69390094
以上是关于kafka重置到最新offset偏移量的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)
Kafka之Fetch offset xxx is out of range for partition xxx,resetting offset情况总结
Spark Streaming Kafka 偏移量 Offset 管理