如何使用 spring-kafka 暂停和恢复 @KafkaListener

Posted

技术标签:

【中文标题】如何使用 spring-kafka 暂停和恢复 @KafkaListener【英文标题】:how to pause and resume @KafkaListener using spring-kafka 【发布时间】:2020-02-08 17:29:30 【问题描述】:

我已经实现了 Kafka 消费者,现在我有一个场景。

    从 Kafka 流中读取数据 2.2.5.Release via Srpingboot 加载到数据库table1中 将table1中的数据复制到table2中 清空表1

要执行上述操作,我需要使用石英的调度作业(已编写)暂停/恢复 Kafka 消费者,该作业将数据从表 1 复制到表 2。但在此活动期间,我希望我的 Kafka 侦听器暂停,一旦复制完成,它应该恢复。

我的实现:

@KafkaListener(topicPartitions =
     @TopicPartition(topic = "data_pipe", partitions =  "0" ))
public void listen(ConsumerRecord<String, String> cr) throws Exception 

【问题讨论】:

【参考方案1】:

如果你使用 'kafkaListener annotation' 自动创建 KafkaListenerEndpointRegistry bean,那么,你可以像这样使用它:

@Component
public class KafkaManager 

    private final KafkaListenerEndpointRegistry registry;

    public KafkaManager(KafkaListenerEndpointRegistry registry) 
        this.registry = registry;
    
    public void pause() 
        registry.getListenerContainers().forEach(MessageListenerContainer::pause);
    

    public void resume() 
        registry.getListenerContainers().forEach(MessageListenerContainer::resume);
    

文档:https://docs.spring.io/spring-kafka/reference/html/#pause-resume

【讨论】:

复制数据后谁或什么原因导致暂停以及如何触发恢复 这取决于你的代码;如果你给你的@KafkaListener 一个id,你可以使用registry.getListenerContainer(myId).pause() 我已经尝试过上面的课程,并在暂停和恢复方法中打印日志中的值。但它们从未被调用。 “打印日志中的值”是什么意思?编辑您的问题以显示您尝试过的当前代码。 我还没有成功拖拽@KafkaListener

以上是关于如何使用 spring-kafka 暂停和恢复 @KafkaListener的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 threading 模块暂停和恢复线程?

如何暂停和恢复 GameLoop?

如何暂停和恢复 UIDynamicAnimator 物理模拟

如何在 C++ 中暂停和恢复 POSIX 线程?

如何在Windows下暂停和恢复任何外部进程?

Spotify - 如何暂停曲目和恢复曲目