kafka的auto.offset.reset详解与测试

Posted 小二上酒8

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka的auto.offset.reset详解与测试相关的知识,希望对你有一定的参考价值。

  1. 取值及定义#
    auto.offset.reset有以下三个可选值:

latest (默认)
earliest
none
三者均有共同定义:
对于同一个消费者组,若已有提交的offset,则从提交的offset开始接着消费

意思就是,只要这个消费者组消费过了,不管auto.offset.reset指定成什么值,效果都一样,每次启动都是已有的最新的offset开始接着往后消费

不同的点为:

latest(默认):对于同一个消费者组,若没有提交过offset,则只消费消费者连接topic后,新产生的数据
就是说如果这个topic有历史消息,现在新启动了一个消费者组,且auto.offset.reset=latest,此时已存在的历史消息无法消费到,那保持消费者组运行,如果此时topic有新消息进来了,这时新消息才会被消费到。而一旦有消费,则必然会提交offset
这时候如果该消费者组意外下线了,topic仍然有消息进来,接着该消费者组在后面恢复上线了,它仍然可以从下线时的offset处开始接着消费,此时走的就是共同定义

earliest:对于同一个消费者组,若没有提交过offset,则从头开始消费
就是说如果这个topic有历史消息存在,现在新启动了一个消费者组,且auto.offset.reset=earliest,那将会从头开始消费,这就是与latest不同之处。
一旦该消费者组消费过topic后,此时就有该消费者组的offset了,这种情况下即使指定了auto.offset.reset=earliest,再重新启动该消费者组,效果是与latest一样的,也就是此时走的是共同的定义

none:对于同一个消费者组,若没有提交过offset,会抛异常
一般生产环境基本用不到该参数

  1. 新建全新topic#
    ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic TestOffsetResetTopic --partitions 1 --replication-factor 1 --create

  2. 往新建的topic发送消息#
    便于测试,用Java代码发送5条消息

public class TestProducer

public static void main(String[] args) throws InterruptedException 
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

    String topic = "TestOffsetResetTopic";

    for (int i = 0; i < 5; i++) 
        String value = "message_" + i + "_" + LocalDateTime.now();
        System.out.println("Send value: " + value);
        producer.send(new ProducerRecord<>(topic, value), (metadata, exception) -> 
            if (exception == null) 
                String str = MessageFormat.format("Send success! topic: 0, partition: 1, offset: 2", metadata.topic(), metadata.partition(), metadata.offset());
                System.out.println(str);
            
        );
        Thread.sleep(500);
    

    producer.close();

发送消息成功:

Send value: message_0_2022-09-16T18:26:15.943749600
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 0
Send value: message_1_2022-09-16T18:26:17.066608900
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 1
Send value: message_2_2022-09-16T18:26:17.568667200
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 2
Send value: message_3_2022-09-16T18:26:18.069093600
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 3
Send value: message_4_2022-09-16T18:26:18.583288100
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 4

现在TestOffsetResetTopic这个topic有5条消息,且还没有任何消费者组对其进行消费过,也就是没有任何offset

  1. 测试latest#
    已知topic已经存在5条历史消息,此时启动一个消费者

public class TestConsumerLatest

public static void main(String[] args) 
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 指定消费者组
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
    // 设置 auto.offset.reset
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

    String topic = "TestOffsetResetTopic";
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList(topic));

    // 消费数据
    while (true) 
        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> consumerRecord : consumerRecords) 
            System.out.println(consumerRecord);
        
    


发现如上面所述,历史已存在的5条消息不会消费到,消费者没有任何动静,现在保持消费者在线

启动TestProducer再发5条消息,会发现这后面新发的,offset从5开始的消息就被消费了

ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1663329725731, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_0_2022-09-16T20:02:05.523581500)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1663329726251, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_1_2022-09-16T20:02:06.251399400)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 7, CreateTime = 1663329726764, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_2_2022-09-16T20:02:06.764186200)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 8, CreateTime = 1663329727264, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_3_2022-09-16T20:02:07.264268500)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 9, CreateTime = 1663329727778, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_4_2022-09-16T20:02:07.778469700)

此时该消费者组对于这个topic的offset已经为9了,现在停掉这个消费者(下线),再启动TestProducer发5条消息,接着再启动TestConsumerLatest,会发现紧接上一次的offset之后开始,即从10继续消费

如果测试发现没动静,请多等一会,估计机器性能太差…

  1. 测试earliest#
    新建一个测试消费者,设置auto.offset.reset为earliest,注意groupid为新的group2,表示对于topic来说是全新的消费者组

public class TestConsumerEarliest

public static void main(String[] args) 
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 指定消费者组
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
    // 设置 auto.offset.reset
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    String topic = "TestOffsetResetTopic";
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList(topic));

    // 消费数据
    while (true) 
        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> consumerRecord : consumerRecords) 
            System.out.println(consumerRecord);
        
    


一运行发现已有的10条消息(最开始5条加上面一次测试又发了5条,一共10条)是可以被消费到的,且消费完后,对于这个topic就已经有了group2这个组的offset了,无论之后启停,只要groupid不变,都会从最新的offset往后开始消费

  1. 测试none#
    新建一个测试消费者,设置auto.offset.reset为none,注意groupid为新的group3,表示对于topic来说是全新的消费者组

public class TestConsumerNone

public static void main(String[] args) 
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 指定消费者组
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group3");
    // 设置 auto.offset.reset
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

    String topic = "TestOffsetResetTopic";
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList(topic));

    // 消费数据
    while (true) 
        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> consumerRecord : consumerRecords) 
            System.out.println(consumerRecord);
        
    


一运行,程序报错,因为对于topic来说是全新的消费者组,且又指定了auto.offset.reset为none,直接抛异常,程序退出

Exception in thread “main” org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [TestOffsetResetTopic-0]
at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:706)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2434)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1266)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at kakfa.TestConsumerNone.main(TestConsumerNone.java:31)

  1. 总结#
    如果topic已经有历史消息了,又需要消费这些历史消息,则必须要指定一个从未消费过的消费者组,同时指定auto.offset.reset为earliest,才可以消费到历史数据,之后就有提交offset。有了offset,无论是earliest还是latest,效果都是一样的了。
    如果topic没有历史消息,或者不需要处理历史消息,那按照默认latest即可。

kafka auto.offset.reset值不起作用

文章目录

一、auto.offset.reset值含义解释

<earliest>
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
<latest>
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
<none>
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

二、实际应用不起作用


问题原因

那就是对于同一个groupid的消费者,
如果这个topic某个分区有已经提交的offset,那么无论是把auto.offset.reset=earliest还是latest,都将失效,
消费者会从已经提交的offset开始消费.

那我没究竟如何才能消费最早的数据?肯定是可以的,看下面测试验证!!!

测试验证1(网上说,关闭自动提交)——失败

把kafka的enable.offset.commit设置为false,让kafka的自动提交功能关闭,这时候对于某个topic就没有已经提交的offset了

测试验证2(满足两个条件)——成功

欢迎加入扣扣老程序员圈:783092701

  1. 使用一个全新的"group.id"(就是之前没有被任何消费者使用过);
  2. 指定"auto.offset.reset"参数的值为earliest;

注意

测试验证2,换了新的group.id是可以从最早数据消费;但是重启服务后,又只能从最新的开始消费!

三、kafka能否创建很多消费者组

  • 需求:需要每次从最早消费,那就需要每次创建新的消费者组
  • 疑惑点我们能否手动删除旧的消费者组?
    无需与新使用者一起删除。以下是尝试删除时脚本输出的内容:
    请注意,无需删除新使用者的组元数据,因为当最后一个成员离开时,它会自动删除
    这是简短的答案。更多细节:通过“元数据”,有两个意思。首先,只需存储作为组成员身份协调器一部分的有关消费者和消费者组的信息。如果组中的所有消费者都不在,则会自动删除。
    其次,使用者组已将提交的偏移量存储在kafka主题中(使用新使用者时)。以前这些都是用zookeeper存储的)。当消费者群体消失时,该主题不会立即删除。如果使用者组再次出现,它将在本主题中自动找到以前的偏移量。它可以选择使用它们或忽略它们。如果使用者组不再出现,这些存储的偏移量最终会被自动垃圾收集。
    因此,简而言之,在使用新消费者时不需要删除任何内容。

以上是关于kafka的auto.offset.reset详解与测试的主要内容,如果未能解决你的问题,请参考以下文章

kafka auto.offset.reset latest earliest 详解

kafka auto.offset.reset值不起作用

Kafka Consumer配置——auto.offset.reset如何控制消息消费

kafka之consumer参数auto.offset.reset 0.10+

kafkakafka 消费速度 于 日志清理速度 (kafka数据被清理了)会发生什么 auto.offset.reset 参数

关于Kafka的其他一些内容,堆积情况,retention,auto.offset.reset