无论组的已提交偏移量如何,如何从主题的末尾读取
Posted
技术标签:
【中文标题】无论组的已提交偏移量如何,如何从主题的末尾读取【英文标题】:How to read from the end of a topic, regardless of the group's committed offset 【发布时间】:2019-10-02 17:03:39 【问题描述】:我正在使用以下包来消费 kafka 消息
compile 'org.springframework.boot:spring-boot-starter-webflux'
compile("org.springframework.boot:spring-boot-starter-web")
// tag::actuator[]
compile("org.springframework.boot:spring-boot-starter-actuator")
compile('org.springframework.kafka:spring-kafka:2.1.7.RELEASE')
compile 'io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE'
我想从一个主题的末尾消费消息,而不考虑组的提交偏移量
当我搜索时,我发现我们可以使用以下代码进行操作
consumer = new KafkaConsumer<>(properties);
consumer.seekToEnd(Collections.emptySet());
但我无法找到如何在 spring boot webflux 中使用上述代码
@Component
public class EventConsumer
private final EmitterProcessor<ServerSentEvent<String>> emitter = EmitterProcessor.create();
public Flux<ServerSentEvent<String>> get()
return emitter;
@KafkaListener(topics = "$kafka.zone.status.topic.name")
public void receive(String data)
//System.out.println(data);
emitter.onNext(ServerSentEvent.builder(data).id(UUID.randomUUID().toString()).build());
【问题讨论】:
【参考方案1】:见the documentation。
实现ConsumerSeekAware
并在onPartitionsAssigned
方法中执行搜索。
@Component
public class EventConsumer implements ConsumerSeekAware
private final EmitterProcessor<ServerSentEvent<String>> emitter = EmitterProcessor.create();
public Flux<ServerSentEvent<String>> get()
return emitter;
@KafkaListener(topics = "$kafka.zone.status.topic.name")
public void receive(String data)
// System.out.println(data);
emitter.onNext(ServerSentEvent.builder(data).id(UUID.randomUUID().toString()).build());
@Override
public void registerSeekCallback(ConsumerSeekCallback callback)
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback)
assignments.keySet().forEach(tp -> callback.seekToEnd(tp.topic(), tp.partition()));
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback)
【讨论】:
非常感谢您的帮助 嗨,Gary Russell,我们可以得到最新的偏移量吗?我想开始寻找最后一个的消息 不要对旧答案提出新问题。总是问一个新问题——它可以帮助其他人寻找答案。而不是CosumerSeekAware
,使用ConsumerAwareRebalanceListener
。调用consumer.position(...)
获取当前位置,然后直接在Consumer
上进行搜索。以上是关于无论组的已提交偏移量如何,如何从主题的末尾读取的主要内容,如果未能解决你的问题,请参考以下文章