停止使用 Stream 侦听器的消息

Posted

技术标签:

【中文标题】停止使用 Stream 侦听器的消息【英文标题】:Stop consume message for Stream listener 【发布时间】:2020-03-06 18:36:45 【问题描述】:

我正在寻找一种方法来停止使用流侦听器消费消息。

@StreamListener(MBinding.M_INPUT)
    public void consumeMessage(Message<MerchantEvent> message) 
    //handle when receive message
 

cloud:
        stream:
            bindings:
                MInput:
                    destination: topicName
                    group: groupName

我已经用谷歌搜索过了,但现在仍然不知道如何停止消费。有谁知道吗?

【问题讨论】:

【参考方案1】:

您可以使用执行器执行此操作(请参阅Binding Visualization and Control)。或者您可以通过编程方式调用端点。

@SpringBootApplication
@EnableBinding(Sink.class)
public class So58795176Application 

    public static void main(String[] args) 
        SpringApplication.run(So58795176Application.class, args);
    

    @StreamListener(Sink.INPUT)
    public void listen(String in) 
        System.out.println();
    

    @Autowired
    BindingsEndpoint endpoint;

    @Bean
    public ApplicationRunner runner() 
        return args -> 
            System.in.read();
            endpoint.changeState("input", State.STOPPED);
            System.in.read();
            endpoint.changeState("input", State.STARTED);
        ;
    


【讨论】:

我检查了绑定端点,当我将绑定状态更改为 PAUSE 时,消费者仍然在消费消息本身,所以现在我对绑定的含义感到困惑。我只是看了几天卡夫卡,所以不清楚。您能告诉我这种情况下的绑定是什么意思吗? 暂停/停止在之前的poll()检索到的记录被处理后才会生效。如果要立即暂停/停止,可以设置max.poll.records=1 上一条消息已经处理完毕,max.poll.records 设置为1,然后我执行绑定端点,状态变为PAUSED,但消费者仍在消费消息 我在检查错误的类时出错了,绑定端点运行良好,非常感谢:D @GaryRussell State.STOPPED 状态是私有的。如何使用它?其次,我们是否需要打开执行器才能使上面的代码工作?

以上是关于停止使用 Stream 侦听器的消息的主要内容,如果未能解决你的问题,请参考以下文章

使用 SQL Server AlwaysOn 创建侦听器突然停止工作

如果侦听器关闭,持久性 JMS 消息如何存活

试图停止消息驱动的通道适配器但丢弃消息

Spring Cloud Stream 验证

Flutter:如何暂停和重新启动“Stream”侦听器?

如何在不和谐中使用命令切换触发 Cog 侦听器