卡夫卡消费者不是从最新消息开始

Posted

技术标签:

【中文标题】卡夫卡消费者不是从最新消息开始【英文标题】:Kafka consumer does not start from latest message 【发布时间】:2017-10-12 06:10:36 【问题描述】:

我想要一个从主题中的最新消息开始的 Kafka 消费者。

这里是java代码:

private static Properties properties = new Properties();
private static KafkaConsumer<String, String> consumer;
static

    properties.setProperty("bootstrap.servers","localhost");
    properties.setProperty("enable.auto.commit", "true");
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("group.id", "test");
    properties.setProperty("auto.offset.reset", "latest");
    consumer = new KafkaConsumer<>(properties);

    consumer.subscribe(Collections.singletonList("mytopic"));


@Override
public StreamHandler call() throws Exception

    while (true) 
    
        ConsumerRecords<String, String> consumerRecords = consumer.poll(200);
        Iterable<ConsumerRecord<String, String>> records = consumerRecords.records("mytopic");
        for(ConsumerRecord<String, String> rec : records)
        
            System.out.println(rec.value());
        
    

虽然auto.offset.reset的值是latest,但是消费者从属于2天前的form消息开始,然后赶上最新消息.

我错过了什么?

【问题讨论】:

【参考方案1】:

您之前是否使用相同的 group.id 运行过相同的代码? auto.offset.reset 参数仅在没有为您的消费者存储的现有偏移量时使用。所以如果你之前运行过这个例子,比如说两天前,然后你再次运行它,它将从最后消耗的位置开始。

如果您想手动转到主题的末尾,请使用seekToEnd()

请参阅https://***.com/a/32392174/1392894 以获得更彻底的讨论。

【讨论】:

谢谢,我认为你是对的!两天前我用过它,我不知道,最新的偏移量是指 group.id 已被消耗的最新偏移量。【参考方案2】:

如果您想手动控制偏移的位置,您需要设置 enable.auto.commit = false。

如果要将所有偏移量定位到每个分区的末尾,请调用 seekToEnd()

https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToEnd(java.util.Collection)

【讨论】:

以上是关于卡夫卡消费者不是从最新消息开始的主要内容,如果未能解决你的问题,请参考以下文章

卡夫卡消费者:受控阅读主题

卡夫卡动物园管理员的目的

卡夫卡与cloudera TLS失败

可能是什么原因,卡夫卡消费者承认,抛出InterruptedException?

Spring Cloud Stream 手动偏移管理

消费者。如何指定要读取的分区? [卡夫卡]