kafka消费者自动提交是如何工作的?

Posted

技术标签:

【中文标题】kafka消费者自动提交是如何工作的?【英文标题】:How does kafka consumer auto commit work? 【发布时间】:2018-03-14 18:18:22 【问题描述】:

我正在阅读this one:

自动提交 提交偏移量最简单的方法是允许 消费者为你做。如果配置 enable.auto.commit=true, 然后每五秒钟消费者将提交最大的偏移量 您的客户从 poll() 收到。五秒间隔是 默认并通过设置 auto.commit.interval.ms 来控制。只是 像消费者中的其他一切一样,自动提交是驱动的 通过轮询循环。每当您进行轮询时,消费者都会检查是否是时间 提交,如果是,它将提交它在 上次投票。

可能是我的英语不好,但我没有完全理解这个描述。

假设我使用默认间隔的自动提交 - 5 秒,轮询每 7 秒发生一次。在这种情况下,提交将每 5 秒或每 7 秒发生一次?

如果轮询每 3 秒发生一次,你能澄清一下行为吗?提交是每 5 秒还是每 6 秒发生一次? 我已阅读this one:

自动提交:您可以将 auto.commit 设置为 true 并设置 auto.commit.interval.ms 属性,以毫秒为单位。一次 你启用了这个,Kafka 消费者将提交偏移量 响应其 poll() 调用而收到的最后一条消息。 poll() 调用 在设置 auto.commit.interval.ms 时在后台发出。

这与答案相矛盾。

你能详细解释一下这些东西吗?

假设我有这样的图表:

0 秒 - 轮询 4 秒 - 投票 8 秒 - 轮询

offset 什么时候提交,什么时候提交?

【问题讨论】:

【参考方案1】:

每次轮询都会调用自动提交检查,并检查经过的时间是否大于配置的时间。如果是,则提交偏移量。

如果提交间隔为 5 秒并且轮询在 7 秒内发生,则提交将仅在 7 秒后发生。

【讨论】:

第二种情况? 第二种情况将遵循相同的逻辑,对于第一次轮询它不会提交为 3 5 并且在提交后它将重置计数器并且相同模式将遵循 但是在这种情况下如何提交最后一次投票呢?我应该手动做吗? 当你关闭消费者并启用自动提交时,它会在关闭消费者之前提交偏移量【参考方案2】:

它会在轮询完成后尽快自动提交。你可以看一下consumer coordinator的源代码,它在类级别定义了一组本地字段,以了解是否启用了自动提交、间隔是什么以及执行自动提交的下一个截止日期是什么。

https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L625

以及 poll 中执行存储调用的地方之一https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L279

也就是说,例如每 7 秒执行一次轮询,并将自动提交设置为 5:

0 - 投票,+ 将截止日期设置为第 5 秒

7 - 由于截止日期轮询 + 提交,将截止日期更新为 7+5=12

14 - 轮询 + 提交截止日期,更新截止日期为 12+5=17

但是,如果轮询设置为每 3 秒一次,并且自动提交设置为 5:

0 - 投票,+ 将截止日期设置为第 5 秒

3 - 轮询,无提交

6 - 由于截止日期轮询 + 提交,更新截止日期为 6+5=11

【讨论】:

第一个截止日期只发生在第 6 秒? @g*** 是的,根据kafka客户端的源代码。我知道这听起来很愚蠢,但总的来说,从不同的角度来看,你想达到什么目标?在任务关键型系统中,一旦您非常确定消息已处理,最好手动执行偏移量提交,在其他情况下 - 您可能更喜欢不那么频繁的偏移量提交以加快进程。然而,偏移提交并非微不足道,因为它会涉及 Zookeeper 等。 @zubrabubra ,是不是意味着,它不一定会在每个间隔提交偏移量,通过“auto.commit.interval.ms”配置?我们还需要计算经过的时间吗? @Nag autocommit 遵守两次连续调用轮询之间处理数据的间隔和时间。但是当它重新启动最后期限 ``` public void mayAutoCommitOffsetsAsync(long now) ... t...CommitDeadline = now + autoCommitIntervalMs; ... ``` 正如你现在看到的那样,它来自轮询请求的开头。【参考方案3】:

这里有一个简单的代码来测试它是如何工作的。

doc -> https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html

public class KafkaTest 
    
    public static final String KAFKA_TOPIC_NAME = "kafka-xx-test-topic";
    public static final String CONSUMER_GROUP_ID = "test-consumer-xx";
    public static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) 
        final KafkaProducer<Object, Object> kafkaProducer = new KafkaProducer<>(getProps());
        for (int i = 0; i < 1000; i++) 
            kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC_NAME, "Data_" + i));
        
        final Consumer<Long, String> consumer = new KafkaConsumer<>(getProps());
        consumer.subscribe(Collections.singletonList(KAFKA_TOPIC_NAME));
        TopicPartition actualTopicPartition = new TopicPartition(KAFKA_TOPIC_NAME, 0);
        while (true) 
            final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofSeconds(60));
            consumerRecords.forEach(record -> 
                try 
                    TimeUnit.MILLISECONDS.sleep(200);
                 catch (InterruptedException e) 
                
            );
            final long committedOffset = consumer.committed(Collections.singleton(actualTopicPartition)).get(actualTopicPartition).offset();
            final long consumerCurrentOffset = consumer.position(actualTopicPartition);
            System.out.println("Poll finish.. consumer-offset: " + consumerCurrentOffset + " - committed-offset: " + committedOffset + " " + LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")));
        
    

    private static Map<String, Object> getProps() 
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); //  Default: latest
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // Default: true
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); // Default: 500
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // Default: 5000
        return props;
    

每 2 秒轮询一次 每 5 秒自动提交一次

输出如下

Poll finish.. consumer-offset: 1010 - committed-offset: 1000 17:07:05
Poll finish.. consumer-offset: 1020 - committed-offset: 1000 17:07:07
Poll finish.. consumer-offset: 1030 - committed-offset: 1000 17:07:09
Poll finish.. consumer-offset: 1040 - committed-offset: 1030 17:07:11 -> commit when poll finish because of elapsed time(6 sec) > commit interval(5 sec)
Poll finish.. consumer-offset: 1050 - committed-offset: 1030 17:07:13
Poll finish.. consumer-offset: 1060 - committed-offset: 1030 17:07:15
Poll finish.. consumer-offset: 1070 - committed-offset: 1060 17:07:17 -> auto commit 
Poll finish.. consumer-offset: 1080 - committed-offset: 1060 17:07:19
Poll finish.. consumer-offset: 1090 - committed-offset: 1060 17:07:21
Poll finish.. consumer-offset: 1100 - committed-offset: 1090 17:07:23 -> auto commit 

【讨论】:

为什么消费者提交了像 1020,1030 这样的偏移量,而生产者刚刚产生了 1000 条消息?[来自代码] 这个主题是否有超过 1000 条消息? @Nag 我刚开始第二次 :) 主题有旧消息,它不重要 有道理:-),是的,这并不重要,但问这个问题是为了看看我是否错过了什么【参考方案4】:

看看下面的配置,它为 Kafka 消费者调优提供了另一个视角: 对于来自生产者的 30 条记录,如果消费者在 20 秒之前崩溃,则消费者会再次读取整组 30 条记录,因为 max-poll-interval 和 auto-commit-interval 都设置为 20 秒

 auto-commit-interval: 20000
      auto-offset-reset: latest
      max-poll-records: 10
      max-poll-interval-ms: 20000

但是对于以下配置,其中自动提交每 2 秒发生一次,并且如果消费者在任何时间点 > 2 秒崩溃,那么那些已提交给 Kafka 生产者的记录将不会再次被消费者拾取。

 auto-commit-interval: 2000
      auto-offset-reset: latest
      max-poll-records: 10
      max-poll-interval-ms: 20000

此外,auto-commit-interval 始终优先于 max-poll-interval。如果自动提交由于某种奇怪的原因没有发生,那么在 20 秒的 max-poll-interval 过去之后,Kafka 代理会得出结论,消费者已经关闭。

【讨论】:

以上是关于kafka消费者自动提交是如何工作的?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka消费者——结合spring开发

kafka消费端提交offset的方式

消费消息+不自动提交

Kafka消费者 位移提交

Kafka——SpringBoot整合(消费者位移的提交)

Kafka