Spring Integration DSL:PublishSubscribeChannel 订单
Posted
技术标签:
【中文标题】Spring Integration DSL:PublishSubscribeChannel 订单【英文标题】:Spring Integration DSL: PublishSubscribeChannel order 【发布时间】:2018-03-03 16:28:12 【问题描述】:我想了解 PublishSubscribeChannel 是如何工作的,所以我实现了一个小例子:
@Bean
public MessageSource<?> integerMessageSource()
MethodInvokingMessageSource source = new MethodInvokingMessageSource();
source.setObject(new AtomicInteger());
source.setMethodName("getAndIncrement");
return source;
@Bean
public IntegrationFlow mainFlow()
// @formatter:off
return IntegrationFlows
.from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000)))
.publishSubscribeChannel(pubSub -> pubSub
.subscribe(flow -> flow
.handle(message -> LOG.info("Handling message, step 1: ", message.getPayload())))
.subscribe(flow -> flow
.handle(message -> LOG.info("Handling message, step 2: ", message.getPayload())))
.subscribe(flow -> flow
.transform(source -> MessageBuilder.withPayload("Error").build())
.handle(message ->
LOG.info("Error");
))
.subscribe(flow -> flow
.handle(message -> LOG.info("Handling message, step 4: ", message.getPayload())))
)
.get();
// @formatter:on
我预计我会看到一个输出:
Handling message, step 1...
Handling message, step 2...
Error
Handling message, step 4...
但总是首先处理第三个子流(带有“错误”输出)。当我尝试为步骤 1、2 和 4 定义顺序时,我得到以下控制台输出(警告):
o.s.integration.dsl.GenericEndpointSpec : 'order' can be applied only for AbstractMessageHandler
我原以为订阅者会按订阅顺序被调用,但事实并非如此。
我正在使用 Spring Boot 1.5.4 和 Spring Integration 4.3.10。
【问题讨论】:
【参考方案1】:问题在于 lambda 处理程序不是 Ordered
- 发布/订阅频道的一般合同是首先(按顺序)调用 Ordered
订阅者,然后是无序订阅者。
由于 lambdas 不能实现多个接口,我不确定我们能做些什么。
作为一种变通方法,您可以执行类似...
@Bean
public IntegrationFlow mainFlow()
// @formatter:off
return IntegrationFlows
.from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000)))
.publishSubscribeChannel(pubSub -> pubSub
.subscribe(flow -> flow
.handle(handler("Handling message, step 1: ")))
.subscribe(flow -> flow
.handle(handler("Handling message, step 2: ")))
.subscribe(flow -> flow
.transform(message -> "Error")
.handle(message ->
LOG.info("Error");
))
.subscribe(flow -> flow
.handle(handler("Handling message, step 4: ")))
)
.get();
// @formatter:on
private MessageHandler handler(String format)
return new AbstractMessageHandler()
@Override
protected void handleMessageInternal(Message<?> message) throws Exception
LOG.info(format, message.getPayload());
;
这样所有订阅者都是Ordered
。
编辑
这里有一个更简单的解决方法 - 使用桥而不是 lambda 启动子流,因此所有子流优先组件都实现 Ordered
...
@Bean
public IntegrationFlow mainFlow()
// @formatter:off
return IntegrationFlows
.from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000)))
.publishSubscribeChannel(pubSub -> pubSub
.subscribe(flow -> flow
.bridge(e -> e.id("s1"))
.handle(message -> LOG.info("Handling message, step 1: ", message.getPayload())))
.subscribe(flow -> flow
.bridge(e -> e.id("s2"))
.handle(message -> LOG.info("Handling message, step 2: ", message.getPayload())))
.subscribe(flow -> flow
.transform(source -> MessageBuilder.withPayload("Error").build())
.handle(message ->
LOG.info("Error");
))
.subscribe(flow -> flow
.bridge(e -> e.id("s4"))
.handle(message -> LOG.info("Handling message, step 4: ", message.getPayload())))
)
.get();
// @formatter:on
【讨论】:
好的,谢谢,这解决了问题! :) 我想我可以在 .handle() 方法中使用“顺序”,因为 .transform() 方法也可以,它也可以将 lambda 作为输入。 .handle() 和 .transform() 有什么区别?.handle(<lambda>)
返回一个 MessageHandler
实现(lambda)。 .transform(..any...)
(甚至.transform(<lambda>)
)返回一个实现Orderable
的MessageTransformingHandler
。对于.transform
,lambda 是Transformer
,而不是MessageHandler
。
我添加了另一个需要更少代码的解决方法;见编辑。我打开了INT-4347 看看我们是否可以改进它。以上是关于Spring Integration DSL:PublishSubscribeChannel 订单的主要内容,如果未能解决你的问题,请参考以下文章
Spring Integration DSL:PublishSubscribeChannel 订单
Spring Integration DSL JMS 入站/出站网关