Spring Kafka - 如何使用组 ID 将偏移重置为最新?

Posted

技术标签:

【中文标题】Spring Kafka - 如何使用组 ID 将偏移重置为最新?【英文标题】:Spring Kafka - How to reset offset to latest with a group id? 【发布时间】:2018-05-26 10:05:22 【问题描述】:

我目前正在使用 Spring Integration Kafka 进行实时统计。但是,组名使 Kafka 搜索侦听器未读取的所有先前值。

@Value("$kafka.consumer.group.id")
private String consumerGroupId;

@Bean
public ConsumerFactory<String, String> consumerFactory() 
    return new DefaultKafkaConsumerFactory<>(getDefaultProperties());


public Map<String, Object> getDefaultProperties() 
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);

    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    return properties;


@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() 

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;


@Bean
public KafkaMessageListener listener() 
    return new KafkaMessageListener();

我想从最新的偏移量开始,而不是被旧值所困扰。是否有可能重置组的偏移量?

【问题讨论】:

【参考方案1】:

因为我没有看到任何这样的例子,所以我将在这里解释我是如何做到的。

@KafkaListener 的类必须实现 ConsumerSeekAware 类,这将允许侦听器在分配分区时控制偏移量搜索。 (来源:https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek)

public class KafkaMessageListener implements ConsumerSeekAware 
    @KafkaListener(topics = "your.topic")
    public void listen(byte[] payload) 
        // ...
    

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) 

    

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) 
        assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
    

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) 


    

在这里,在重新平衡时,我们使用给定的回调来寻找所有给定主题的最后偏移量。感谢 Artem Bilan (https://***.com/users/2756547/artem-bilan) 指导我找到答案。

【讨论】:

callback.seekToEnd() 真的等同于auto.offset.reset = latest 你应该阅读这个帖子:***.com/questions/32390265/… 你救了我朋友的命,谢谢!!【参考方案2】:

您可以在订阅一些主题时为kafka消费者设置ConsumerRebalanceListener,在其中您可以通过KafkaConsumer.endOffsets()方法获取每个分区的最新偏移量,并通过KafkaConsumer.seek()方法将其设置为消费者,例如这个:

kafkaConsumer.subscribe(Collections.singletonList(topics),
    new ConsumerRebalanceListener() 
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
            //do nothing
        

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) 
            //get and set the lastest offset for each partiton
            kafkaConsumer.endOffsets(partitions) 
                .forEach((partition, offset) -> kafkaConsumer.seek(partition, offset));
        
    
);

【讨论】:

【参考方案3】:

另一种方式,我们总是可以在没有提交组偏移量的情况下使用最新消息,方法是使用 "enable.auto.commit:false", "auto.offset.reset:latest" 为 KafkaListener 注释指定属性值。

@KafkaListener(id = "example-group",
        properties = "enable.auto.commit:false", "auto.offset.reset:latest",
        topics = "example")

【讨论】:

【参考方案4】:

您可以使用 partitionOffsets 注释以精确的偏移量开始,例如:

@KafkaListener(id = "bar", topicPartitions =
     @TopicPartition(topic = "topic1", partitions =  "0", "1" ),
      @TopicPartition(topic = "topic2", partitions = "0",
         partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
    )public void listen(ConsumerRecord<?, ?> record) 
     

【讨论】:

由于 Kafka 主题通常具有关联的保留时间,因此此方法可能会失败,因为偏移量“100”可能不存在(因为代理将在 X 天后删除数据)。文档不太清楚如果偏移量不存在会发生什么。【参考方案5】:

对于kafka中没有初始偏移量的新消费组,可以设置AUTO_OFFSET_RESET_CONFIG

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

对于现有的消费者群体,您可以:

    更改组 id 以显示为新的,即consumer-group-id-v2 实现ConsumerSeekAware,这样您就可以在初始化See docs 期间寻找所需的偏移量

【讨论】:

如果您没有初始偏移量,则此方法有效。但是在这里我们要“重置”偏移量! @Bachrc 没错,这只是一种解决方法,如果你有能力将你的 consumerGroupId 从my-consumer 更改为'my-consumer-v2。它成为一个新组,在 kafka 中没有初始偏移量。 显然,并非总是可以这样做。

以上是关于Spring Kafka - 如何使用组 ID 将偏移重置为最新?的主要内容,如果未能解决你的问题,请参考以下文章

kafka 消费者进行消费数据的各种场景的API(你值得一看)

如何将 Apache kafka 与 Spring mvc 一起使用?可能吗?

flink kafka消费者组ID不起作用

Spark Streaming:Spark Structured Streaming 中不允许使用 Kafka 组 ID

设置 Kafka Connect,无法重命名组 ID

kafka 消费者组 ID 无法按预期工作