Spring Kafka - 如何使用组 ID 将偏移重置为最新?
Posted
技术标签:
【中文标题】Spring Kafka - 如何使用组 ID 将偏移重置为最新?【英文标题】:Spring Kafka - How to reset offset to latest with a group id? 【发布时间】:2018-05-26 10:05:22 【问题描述】:我目前正在使用 Spring Integration Kafka 进行实时统计。但是,组名使 Kafka 搜索侦听器未读取的所有先前值。
@Value("$kafka.consumer.group.id")
private String consumerGroupId;
@Bean
public ConsumerFactory<String, String> consumerFactory()
return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
public Map<String, Object> getDefaultProperties()
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return properties;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory()
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
@Bean
public KafkaMessageListener listener()
return new KafkaMessageListener();
我想从最新的偏移量开始,而不是被旧值所困扰。是否有可能重置组的偏移量?
【问题讨论】:
【参考方案1】:因为我没有看到任何这样的例子,所以我将在这里解释我是如何做到的。
@KafkaListener
的类必须实现 ConsumerSeekAware
类,这将允许侦听器在分配分区时控制偏移量搜索。 (来源:https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek)
public class KafkaMessageListener implements ConsumerSeekAware
@KafkaListener(topics = "your.topic")
public void listen(byte[] payload)
// ...
@Override
public void registerSeekCallback(ConsumerSeekCallback callback)
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback)
assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback)
在这里,在重新平衡时,我们使用给定的回调来寻找所有给定主题的最后偏移量。感谢 Artem Bilan (https://***.com/users/2756547/artem-bilan) 指导我找到答案。
【讨论】:
callback.seekToEnd()
真的等同于auto.offset.reset = latest
你应该阅读这个帖子:***.com/questions/32390265/…
你救了我朋友的命,谢谢!!【参考方案2】:
您可以在订阅一些主题时为kafka消费者设置ConsumerRebalanceListener
,在其中您可以通过KafkaConsumer.endOffsets()
方法获取每个分区的最新偏移量,并通过KafkaConsumer.seek()
方法将其设置为消费者,例如这个:
kafkaConsumer.subscribe(Collections.singletonList(topics),
new ConsumerRebalanceListener()
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
//do nothing
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
//get and set the lastest offset for each partiton
kafkaConsumer.endOffsets(partitions)
.forEach((partition, offset) -> kafkaConsumer.seek(partition, offset));
);
【讨论】:
【参考方案3】:另一种方式,我们总是可以在没有提交组偏移量的情况下使用最新消息,方法是使用 "enable.auto.commit:false", "auto.offset.reset:latest"
为 KafkaListener 注释指定属性值。
@KafkaListener(id = "example-group",
properties = "enable.auto.commit:false", "auto.offset.reset:latest",
topics = "example")
【讨论】:
【参考方案4】:您可以使用 partitionOffsets 注释以精确的偏移量开始,例如:
@KafkaListener(id = "bar", topicPartitions =
@TopicPartition(topic = "topic1", partitions = "0", "1" ),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
)public void listen(ConsumerRecord<?, ?> record)
【讨论】:
由于 Kafka 主题通常具有关联的保留时间,因此此方法可能会失败,因为偏移量“100”可能不存在(因为代理将在 X 天后删除数据)。文档不太清楚如果偏移量不存在会发生什么。【参考方案5】:对于kafka中没有初始偏移量的新消费组,可以设置AUTO_OFFSET_RESET_CONFIG
:
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
对于现有的消费者群体,您可以:
-
更改组 id 以显示为新的,即
consumer-group-id-v2
实现ConsumerSeekAware
,这样您就可以在初始化See docs 期间寻找所需的偏移量
【讨论】:
如果您没有初始偏移量,则此方法有效。但是在这里我们要“重置”偏移量! @Bachrc 没错,这只是一种解决方法,如果你有能力将你的 consumerGroupId 从my-consumer
更改为'my-consumer-v2
。它成为一个新组,在 kafka 中没有初始偏移量。
显然,并非总是可以这样做。以上是关于Spring Kafka - 如何使用组 ID 将偏移重置为最新?的主要内容,如果未能解决你的问题,请参考以下文章
kafka 消费者进行消费数据的各种场景的API(你值得一看)
如何将 Apache kafka 与 Spring mvc 一起使用?可能吗?
Spark Streaming:Spark Structured Streaming 中不允许使用 Kafka 组 ID