kafka消费者自动提交是如何工作的?
Posted
技术标签:
【中文标题】kafka消费者自动提交是如何工作的?【英文标题】:How does kafka consumer auto commit work? 【发布时间】:2018-03-14 18:18:22 【问题描述】:我正在阅读this one:
自动提交 提交偏移量最简单的方法是允许 消费者为你做。如果配置 enable.auto.commit=true, 然后每五秒钟消费者将提交最大的偏移量 您的客户从 poll() 收到。五秒间隔是 默认并通过设置 auto.commit.interval.ms 来控制。只是 像消费者中的其他一切一样,自动提交是驱动的 通过轮询循环。每当您进行轮询时,消费者都会检查是否是时间 提交,如果是,它将提交它在 上次投票。
可能是我的英语不好,但我没有完全理解这个描述。
假设我使用默认间隔的自动提交 - 5 秒,轮询每 7 秒发生一次。在这种情况下,提交将每 5 秒或每 7 秒发生一次?
如果轮询每 3 秒发生一次,你能澄清一下行为吗?提交是每 5 秒还是每 6 秒发生一次? 我已阅读this one:
自动提交:您可以将 auto.commit 设置为 true 并设置 auto.commit.interval.ms 属性,以毫秒为单位。一次 你启用了这个,Kafka 消费者将提交偏移量 响应其 poll() 调用而收到的最后一条消息。 poll() 调用 在设置 auto.commit.interval.ms 时在后台发出。
这与答案相矛盾。
你能详细解释一下这些东西吗?
假设我有这样的图表:
0 秒 - 轮询 4 秒 - 投票 8 秒 - 轮询
offset 什么时候提交,什么时候提交?
【问题讨论】:
【参考方案1】:每次轮询都会调用自动提交检查,并检查经过的时间是否大于配置的时间。如果是,则提交偏移量。
如果提交间隔为 5 秒并且轮询在 7 秒内发生,则提交将仅在 7 秒后发生。
【讨论】:
第二种情况? 第二种情况将遵循相同的逻辑,对于第一次轮询它不会提交为 3 5 并且在提交后它将重置计数器并且相同模式将遵循 但是在这种情况下如何提交最后一次投票呢?我应该手动做吗? 当你关闭消费者并启用自动提交时,它会在关闭消费者之前提交偏移量【参考方案2】:它会在轮询完成后尽快自动提交。你可以看一下consumer coordinator的源代码,它在类级别定义了一组本地字段,以了解是否启用了自动提交、间隔是什么以及执行自动提交的下一个截止日期是什么。
https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L625
以及 poll 中执行存储调用的地方之一https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L279
也就是说,例如每 7 秒执行一次轮询,并将自动提交设置为 5:
0 - 投票,+ 将截止日期设置为第 5 秒
7 - 由于截止日期轮询 + 提交,将截止日期更新为 7+5=12
14 - 轮询 + 提交截止日期,更新截止日期为 12+5=17
但是,如果轮询设置为每 3 秒一次,并且自动提交设置为 5:
0 - 投票,+ 将截止日期设置为第 5 秒
3 - 轮询,无提交
6 - 由于截止日期轮询 + 提交,更新截止日期为 6+5=11
【讨论】:
第一个截止日期只发生在第 6 秒? @g*** 是的,根据kafka客户端的源代码。我知道这听起来很愚蠢,但总的来说,从不同的角度来看,你想达到什么目标?在任务关键型系统中,一旦您非常确定消息已处理,最好手动执行偏移量提交,在其他情况下 - 您可能更喜欢不那么频繁的偏移量提交以加快进程。然而,偏移提交并非微不足道,因为它会涉及 Zookeeper 等。 @zubrabubra ,是不是意味着,它不一定会在每个间隔提交偏移量,通过“auto.commit.interval.ms”配置?我们还需要计算经过的时间吗? @Nag autocommit 遵守两次连续调用轮询之间处理数据的间隔和时间。但是当它重新启动最后期限 ``` public void mayAutoCommitOffsetsAsync(long now) ... t...CommitDeadline = now + autoCommitIntervalMs; ... ``` 正如你现在看到的那样,它来自轮询请求的开头。【参考方案3】:这里有一个简单的代码来测试它是如何工作的。
doc -> https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
public class KafkaTest
public static final String KAFKA_TOPIC_NAME = "kafka-xx-test-topic";
public static final String CONSUMER_GROUP_ID = "test-consumer-xx";
public static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args)
final KafkaProducer<Object, Object> kafkaProducer = new KafkaProducer<>(getProps());
for (int i = 0; i < 1000; i++)
kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC_NAME, "Data_" + i));
final Consumer<Long, String> consumer = new KafkaConsumer<>(getProps());
consumer.subscribe(Collections.singletonList(KAFKA_TOPIC_NAME));
TopicPartition actualTopicPartition = new TopicPartition(KAFKA_TOPIC_NAME, 0);
while (true)
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofSeconds(60));
consumerRecords.forEach(record ->
try
TimeUnit.MILLISECONDS.sleep(200);
catch (InterruptedException e)
);
final long committedOffset = consumer.committed(Collections.singleton(actualTopicPartition)).get(actualTopicPartition).offset();
final long consumerCurrentOffset = consumer.position(actualTopicPartition);
System.out.println("Poll finish.. consumer-offset: " + consumerCurrentOffset + " - committed-offset: " + committedOffset + " " + LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")));
private static Map<String, Object> getProps()
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // Default: latest
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // Default: true
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); // Default: 500
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // Default: 5000
return props;
每 2 秒轮询一次
每 5 秒自动提交一次
输出如下
Poll finish.. consumer-offset: 1010 - committed-offset: 1000 17:07:05
Poll finish.. consumer-offset: 1020 - committed-offset: 1000 17:07:07
Poll finish.. consumer-offset: 1030 - committed-offset: 1000 17:07:09
Poll finish.. consumer-offset: 1040 - committed-offset: 1030 17:07:11 -> commit when poll finish because of elapsed time(6 sec) > commit interval(5 sec)
Poll finish.. consumer-offset: 1050 - committed-offset: 1030 17:07:13
Poll finish.. consumer-offset: 1060 - committed-offset: 1030 17:07:15
Poll finish.. consumer-offset: 1070 - committed-offset: 1060 17:07:17 -> auto commit
Poll finish.. consumer-offset: 1080 - committed-offset: 1060 17:07:19
Poll finish.. consumer-offset: 1090 - committed-offset: 1060 17:07:21
Poll finish.. consumer-offset: 1100 - committed-offset: 1090 17:07:23 -> auto commit
【讨论】:
为什么消费者提交了像 1020,1030 这样的偏移量,而生产者刚刚产生了 1000 条消息?[来自代码] 这个主题是否有超过 1000 条消息? @Nag 我刚开始第二次 :) 主题有旧消息,它不重要 有道理:-),是的,这并不重要,但问这个问题是为了看看我是否错过了什么【参考方案4】:看看下面的配置,它为 Kafka 消费者调优提供了另一个视角: 对于来自生产者的 30 条记录,如果消费者在 20 秒之前崩溃,则消费者会再次读取整组 30 条记录,因为 max-poll-interval 和 auto-commit-interval 都设置为 20 秒
auto-commit-interval: 20000
auto-offset-reset: latest
max-poll-records: 10
max-poll-interval-ms: 20000
但是对于以下配置,其中自动提交每 2 秒发生一次,并且如果消费者在任何时间点 > 2 秒崩溃,那么那些已提交给 Kafka 生产者的记录将不会再次被消费者拾取。
auto-commit-interval: 2000
auto-offset-reset: latest
max-poll-records: 10
max-poll-interval-ms: 20000
此外,auto-commit-interval 始终优先于 max-poll-interval。如果自动提交由于某种奇怪的原因没有发生,那么在 20 秒的 max-poll-interval 过去之后,Kafka 代理会得出结论,消费者已经关闭。
【讨论】:
以上是关于kafka消费者自动提交是如何工作的?的主要内容,如果未能解决你的问题,请参考以下文章