kafka消费端提交offset的方式

Posted doit8791

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka消费端提交offset的方式相关的知识,希望对你有一定的参考价值。

Kafka 提供了 3 种提交 offset 的方式

  1. 自动提交
1
2
3
4
// 自动提交,默认true
props.put("enable.auto.commit", "true");
// 设置自动每1s提交一次
props.put("auto.commit.interval.ms", "1000");
  1. 手动同步提交 offset
1
consumer.commitSync();
  1. 手动异步提交 offset
1
consumer.commitAsync();

上面说了既然异步提交 offset 可能会重复消费, 那么我使用同步提交是否就可以表明这个问题呢?

1
2
3
4
5
6
7
while(true) 
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record ->
insertIntoDB(record);
consumer.commitSync();
);

很明显不行, 因为 insertIntoDB 和 commitSync() 做不到原子操作, 如果 insertIntoDB() 成功了,但是提交 offset 的时候 consumer 挂掉了,然后服务器重启,仍然会导致重复消费问题。

如何做到不重复消费?

只要保证处理消息和提交 offset 得操作是原子操作,就可以做到不重复消费。我们可以自己管理 committed offset, 而不让 kafka 来进行管理。

比如如下使用方式:

  1. 如果消费的数据刚好需要存储在数据库,那么可以把 offset 也存在数据库,就可以就可以在一个事物中提交这两个结果,保证原子操作。
  2. 借助搜索引擎,把 offset 和数据一起放到索引里面,比如 Elasticsearch

每条记录都有自己的 offset, 所以如果要管理自己的 offset 还得要做下面事情

  1. 设置 enable.auto.commit=false
  2. 使用每个 ConsumerRecord 提供的 offset 来保存消费的位置。
  3. 在重新启动时使用 seek(TopicPartition, long) 恢复上次消费的位置。

通过上面的方式就可以在消费端实现”Exactly Once” 的语义, 即保证只消费一次。但是是否真的需要保证不重复消费呢?这个得看具体业务, 重复消费数据对整体有什么影响在来决定是否需要做到不重复消费。

以上是关于kafka消费端提交offset的方式的主要内容,如果未能解决你的问题,请参考以下文章

kafka消费者和offset的关系,以及异常处理问题

kafka 提交offset

Springboot 整合Kafka 实现手动提交 offset

kafka防止消息重复消费

kafka auto.offset.reset latest earliest 详解

KafkaFlink 消费 kafka 部分 分区 一直不提交 offset