Kafka 消费者正在读取重新启动时最后提交的偏移量(Java)

Posted

技术标签:

【中文标题】Kafka 消费者正在读取重新启动时最后提交的偏移量(Java)【英文标题】:Kafka consumer is reading last committed offset on re-start (Java) 【发布时间】:2019-08-16 22:39:56 【问题描述】:

我有一个 kakfa 消费者,其 enable.auto.commit 设置为 false。每当我重新启动消费者应用程序时,它总是会再次读取上次提交的偏移量,然后再读取下一个偏移量。

例如。最后提交的偏移量是 50。当我重新启动消费者时,它再次首先读取偏移量 50,然后再读取下一个偏移量。

我正在执行 commitsync,如下所示。

Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("sometopic", partition), new OffsetAndMetadata(offset));
kafkaconsumer.commitSync(offsets);

我尝试将 auto.offset.reset 设置为 earliestlatest 但它不会改变行为。

我在消费者配置中遗漏了什么吗?

config.put(ConsumerConfig.CLIENT_ID_CONFIG, "CLIENT_ID");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "GROUP_ID");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,CustomDeserializer.class.getName());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

【问题讨论】:

【参考方案1】:

如果你想使用commitSync(offset),你必须小心并阅读它的Javadoc:

提交的偏移量应该是您的应用程序将使用的下一条消息,即 lastProcessedMessageOffset + 1。

如果不给偏移量加+1,预计下次重启时,消费者会再次消费最后一条消息。如另一个答案中所述,如果您使用 commitSync() 没有任何参数,您不必担心

【讨论】:

【参考方案2】:

您似乎正在尝试使用new OffsetAndMetadta(offset) 提交。这不是典型的用法。

这是文档中的一个示例,在 手动偏移控制下:

 List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
 while (true) 
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) 
         buffer.add(record);
     
     if (buffer.size() >= minBatchSize) 
         insertIntoDb(buffer);
         consumer.commitSync();
         buffer.clear();
     
 

https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

注意consumer.commitSync() 调用是如何在没有任何参数的情况下执行的。它只是消费,它会承诺到那时为止消费的任何东西。

【讨论】:

此提交所有内容,您无法处理更精细的偏移提交,例如,如果我阅读 10 条消息并处理第 6 条消息会引发异常.. 也许我想提交到相对于第 5 条消息的偏移量,因此我不会多次处理相同的消息

以上是关于Kafka 消费者正在读取重新启动时最后提交的偏移量(Java)的主要内容,如果未能解决你的问题,请参考以下文章

Kafka消费者手动提交消息偏移

kafka重复消费的原因

如何在从 Spark 消费 Kafka 时获取偏移 id,将其保存在 Cassandra 中并使用它来重新启动 Kafka?

重新启动 kafka 连接接收器和源连接器以从头开始读取

Kafka手动提交偏移量的作用到底是什么???

Kafka-消费者-偏移量的提交方式