Spring Integration DSL:PublishSubscribeChannel 订单

Posted

技术标签:

【中文标题】Spring Integration DSL:PublishSubscribeChannel 订单【英文标题】:Spring Integration DSL: PublishSubscribeChannel order 【发布时间】:2018-03-03 16:28:12 【问题描述】:

我想了解 PublishSubscribeChannel 是如何工作的,所以我实现了一个小例子:

@Bean
public MessageSource<?> integerMessageSource() 
    MethodInvokingMessageSource source = new MethodInvokingMessageSource();
    source.setObject(new AtomicInteger());
    source.setMethodName("getAndIncrement");
    return source;




@Bean
public IntegrationFlow mainFlow() 
    // @formatter:off
    return IntegrationFlows
        .from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000)))
        .publishSubscribeChannel(pubSub -> pubSub
            .subscribe(flow -> flow
                .handle(message -> LOG.info("Handling message, step 1: ", message.getPayload())))
            .subscribe(flow -> flow
                .handle(message -> LOG.info("Handling message, step 2: ", message.getPayload())))
            .subscribe(flow -> flow
                .transform(source -> MessageBuilder.withPayload("Error").build())
                .handle(message -> 
                    LOG.info("Error");
                ))
            .subscribe(flow -> flow
                .handle(message -> LOG.info("Handling message, step 4: ", message.getPayload())))
        )
        .get();
    // @formatter:on

我预计我会看到一个输出:

Handling message, step 1...
Handling message, step 2...
Error
Handling message, step 4...

但总是首先处理第三个子流(带有“错误”输出)。当我尝试为步骤 1、2 和 4 定义顺序时,我得到以下控制台输出(警告):

o.s.integration.dsl.GenericEndpointSpec  : 'order' can be applied only for AbstractMessageHandler

我原以为订阅者会按订阅顺序被调用,但事实并非如此。

我正在使用 Spring Boot 1.5.4 和 Spring Integration 4.3.10。

【问题讨论】:

【参考方案1】:

问题在于 lambda 处理程序不是 Ordered - 发布/订阅频道的一般合同是首先(按顺序)调用 Ordered 订阅者,然后是无序订阅者。

由于 lambdas 不能实现多个接口,我不确定我们能做些什么。

作为一种变通方法,您可以执行类似...

@Bean
public IntegrationFlow mainFlow() 
    // @formatter:off
    return IntegrationFlows
        .from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000)))
        .publishSubscribeChannel(pubSub -> pubSub
            .subscribe(flow -> flow
                .handle(handler("Handling message, step 1: ")))
            .subscribe(flow -> flow
                .handle(handler("Handling message, step 2: ")))
            .subscribe(flow -> flow
                .transform(message -> "Error")
                .handle(message -> 
                    LOG.info("Error");
                ))
            .subscribe(flow -> flow
                .handle(handler("Handling message, step 4: ")))
        )
        .get();
    // @formatter:on


private MessageHandler handler(String format) 
    return new AbstractMessageHandler() 

        @Override
        protected void handleMessageInternal(Message<?> message) throws Exception 
            LOG.info(format, message.getPayload());
        

    ;


这样所有订阅者都是Ordered

编辑

这里有一个更简单的解决方法 - 使用桥而不是 lambda 启动子流,因此所有子流优先组件都实现 Ordered...

@Bean
public IntegrationFlow mainFlow() 
    // @formatter:off
    return IntegrationFlows
        .from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000)))
        .publishSubscribeChannel(pubSub -> pubSub
            .subscribe(flow -> flow
                .bridge(e -> e.id("s1"))
                .handle(message -> LOG.info("Handling message, step 1: ", message.getPayload())))
            .subscribe(flow -> flow
                .bridge(e -> e.id("s2"))
                .handle(message -> LOG.info("Handling message, step 2: ", message.getPayload())))
            .subscribe(flow -> flow
                .transform(source -> MessageBuilder.withPayload("Error").build())
                .handle(message -> 
                    LOG.info("Error");
                ))
            .subscribe(flow -> flow
                .bridge(e -> e.id("s4"))
                .handle(message -> LOG.info("Handling message, step 4: ", message.getPayload())))
        )
        .get();
    // @formatter:on

【讨论】:

好的,谢谢,这解决了问题! :) 我想我可以在 .handle() 方法中使用“顺序”,因为 .transform() 方法也可以,它也可以将 lambda 作为输入。 .handle() 和 .transform() 有什么区别? .handle(&lt;lambda&gt;) 返回一个 MessageHandler 实现(lambda)。 .transform(..any...)(甚至.transform(&lt;lambda&gt;))返回一个实现OrderableMessageTransformingHandler。对于.transform,lambda 是Transformer,而不是MessageHandler 我添加了另一个需要更少代码的解决方法;见编辑。我打开了INT-4347 看看我们是否可以改进它。

以上是关于Spring Integration DSL:PublishSubscribeChannel 订单的主要内容,如果未能解决你的问题,请参考以下文章

Spring Integration DSL:PublishSubscribeChannel 订单

Spring Integration DSL JMS 入站/出站网关

Spring Integration DSL 中的路由

如何将请求标头添加到 outboundGateway spring integration dsl

Spring Integration DSL 变压器

使用 Spring Integration DSL 读取 Tibco EMS 主题