无论组的已提交偏移量如何,如何从主题的末尾读取

Posted

技术标签:

【中文标题】无论组的已提交偏移量如何,如何从主题的末尾读取【英文标题】:How to read from the end of a topic, regardless of the group's committed offset 【发布时间】:2019-10-02 17:03:39 【问题描述】:

我正在使用以下包来消费 kafka 消息

compile 'org.springframework.boot:spring-boot-starter-webflux'
compile("org.springframework.boot:spring-boot-starter-web")
// tag::actuator[]
compile("org.springframework.boot:spring-boot-starter-actuator")
compile('org.springframework.kafka:spring-kafka:2.1.7.RELEASE')
compile 'io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE'

我想从一个主题的末尾消费消息,而不考虑组的提交偏移量

当我搜索时,我发现我们可以使用以下代码进行操作

consumer = new KafkaConsumer<>(properties);
consumer.seekToEnd(Collections.emptySet());

但我无法找到如何在 spring boot webflux 中使用上述代码

@Component
public class EventConsumer

    private final EmitterProcessor<ServerSentEvent<String>> emitter = EmitterProcessor.create();

    public Flux<ServerSentEvent<String>> get()
    
        return emitter;
    

    @KafkaListener(topics = "$kafka.zone.status.topic.name")
    public void receive(String data)
    
        //System.out.println(data);
        emitter.onNext(ServerSentEvent.builder(data).id(UUID.randomUUID().toString()).build());
    

【问题讨论】:

【参考方案1】:

见the documentation。

实现ConsumerSeekAware 并在onPartitionsAssigned 方法中执行搜索。

@Component
public class EventConsumer implements ConsumerSeekAware 

    private final EmitterProcessor<ServerSentEvent<String>> emitter = EmitterProcessor.create();

    public Flux<ServerSentEvent<String>> get() 
        return emitter;
    

    @KafkaListener(topics = "$kafka.zone.status.topic.name")
    public void receive(String data) 
        // System.out.println(data);
        emitter.onNext(ServerSentEvent.builder(data).id(UUID.randomUUID().toString()).build());
    

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) 

    

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) 
        assignments.keySet().forEach(tp -> callback.seekToEnd(tp.topic(), tp.partition()));
    

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) 

    


【讨论】:

非常感谢您的帮助 嗨,Gary Russell,我们可以得到最新的偏移量吗?我想开始寻找最后一个的消息 不要对旧答案提出新问题。总是问一个新问题——它可以帮助其他人寻找答案。而不是CosumerSeekAware,使用ConsumerAwareRebalanceListener。调用consumer.position(...)获取当前位置,然后直接在Consumer上进行搜索。

以上是关于无论组的已提交偏移量如何,如何从主题的末尾读取的主要内容,如果未能解决你的问题,请参考以下文章

来自不同消费者组的多个消费者如何从同一个分区中读取?

如何重置 Kafka 偏移量以匹配尾部位置?

Kafka-消费者-偏移量的提交方式

Consumer.endOffsets 如何在 Kafka 中工作?

kafka重复消费的原因

如何获取 kafka 主题分区的最新偏移量?