停止使用 Stream 侦听器的消息
Posted
技术标签:
【中文标题】停止使用 Stream 侦听器的消息【英文标题】:Stop consume message for Stream listener 【发布时间】:2020-03-06 18:36:45 【问题描述】:我正在寻找一种方法来停止使用流侦听器消费消息。
@StreamListener(MBinding.M_INPUT)
public void consumeMessage(Message<MerchantEvent> message)
//handle when receive message
cloud:
stream:
bindings:
MInput:
destination: topicName
group: groupName
我已经用谷歌搜索过了,但现在仍然不知道如何停止消费。有谁知道吗?
【问题讨论】:
【参考方案1】:您可以使用执行器执行此操作(请参阅Binding Visualization and Control)。或者您可以通过编程方式调用端点。
@SpringBootApplication
@EnableBinding(Sink.class)
public class So58795176Application
public static void main(String[] args)
SpringApplication.run(So58795176Application.class, args);
@StreamListener(Sink.INPUT)
public void listen(String in)
System.out.println();
@Autowired
BindingsEndpoint endpoint;
@Bean
public ApplicationRunner runner()
return args ->
System.in.read();
endpoint.changeState("input", State.STOPPED);
System.in.read();
endpoint.changeState("input", State.STARTED);
;
【讨论】:
我检查了绑定端点,当我将绑定状态更改为 PAUSE 时,消费者仍然在消费消息本身,所以现在我对绑定的含义感到困惑。我只是看了几天卡夫卡,所以不清楚。您能告诉我这种情况下的绑定是什么意思吗? 暂停/停止在之前的poll()
检索到的记录被处理后才会生效。如果要立即暂停/停止,可以设置max.poll.records=1
。
上一条消息已经处理完毕,max.poll.records 设置为1,然后我执行绑定端点,状态变为PAUSED,但消费者仍在消费消息
我在检查错误的类时出错了,绑定端点运行良好,非常感谢:D
@GaryRussell State.STOPPED 状态是私有的。如何使用它?其次,我们是否需要打开执行器才能使上面的代码工作?以上是关于停止使用 Stream 侦听器的消息的主要内容,如果未能解决你的问题,请参考以下文章