Spring Cloud Stream - 聚合

Posted

技术标签:

【中文标题】Spring Cloud Stream - 聚合【英文标题】:Spring Cloud Stream - Aggregates 【发布时间】:2017-06-20 15:01:15 【问题描述】:

我正在尝试实施建议的 SCS 聚合,但我不确定它们的真正目的,因为我得到的结果让我感到惊讶。 首先,这是代码...

源,要调度的消息提供者:

@SpringBootApplication
@EnableBinding(Source.class)
public class SourceApplication 

    private final Logger logger = LoggerFactory.getLogger(SourceApplication.class);

    @Bean
    @InboundChannelAdapter(Source.OUTPUT)
    public MessageSource<String> createMessage() 
        return () -> 
            String payload = now().toString();
            logger.warn("Sent: " + payload);
            return new GenericMessage<>(payload);
        ;
    

然后是一个简单的处理器(变压器):

@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApplication 

    @Transformer(inputChannel = Processor.INPUT,
                 outputChannel = Processor.OUTPUT)
    public String processMessage(String payload) 
        return payload + " is the time.";
    

这是最终消费者(接收器):

@SpringBootApplication
@EnableBinding(Sink.class)
public class SinkApplication 

    private final Logger logger = LoggerFactory.getLogger(SinkApplication.class);

    @ServiceActivator(inputChannel = Sink.INPUT)
    public void loggerSink(Object payload) 
        logger.warn("Received: " + payload);
    

最后,聚合链接了这三个:

@SpringBootApplication
public class SampleAggregateApplication 

    public static void main(String[] args) 
        new AggregateApplicationBuilder().web(false)
            .from(SourceApplication.class)
                .args("--spring.cloud.stream.bindings.output.destination=step1", "--fixedDelay=5000")
            .via(ProcessorApplication.class)
                .args("--spring.cloud.stream.bindings.input.destination=step1",
                      "--spring.cloud.stream.bindings.output.destination=step2")
            .to(SinkApplication.class)
                .args("--spring.cloud.stream.bindings.input.destination=step2")
        .run(args);
    

当聚合启动时,这些是检索到的跟踪的摘录:

2017-02-03 09:59:13.428  WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:13.428
2017-02-03 09:59:13.949  WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:13.949
2017-02-03 09:59:13.949  WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:13.949
2017-02-03 09:59:13.996  WARN 18688 --- [ask-scheduler-9] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:13.996
2017-02-03 09:59:14.430  WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:14.430
2017-02-03 09:59:14.956  WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:14.956
2017-02-03 09:59:14.956  WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:14.956 is the time.
2017-02-03 09:59:14.999  WARN 18688 --- [ask-scheduler-5] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:14.999
2017-02-03 09:59:15.432  WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:15.432
2017-02-03 09:59:15.961  WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:15.961
2017-02-03 09:59:15.961  WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:15.961
2017-02-03 09:59:16.000  WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:16
2017-02-03 09:59:16.001  WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:16
2017-02-03 09:59:16.437  WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:16.437
2017-02-03 09:59:16.966  WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:16.966
2017-02-03 09:59:17.006  WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:17.006
2017-02-03 09:59:17.006  WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:17.006
2017-02-03 09:59:17.443  WARN 18688 --- [ask-scheduler-9] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:17.443
2017-02-03 09:59:17.971  WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:17.971
2017-02-03 09:59:17.971  WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:17.971
2017-02-03 09:59:18.007  WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:18.007
2017-02-03 09:59:18.448  WARN 18688 --- [ask-scheduler-5] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:18.448
2017-02-03 09:59:18.976  WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:18.976
2017-02-03 09:59:18.976  WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:18.976 is the time.
2017-02-03 09:59:19.012  WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:19.012
2017-02-03 09:59:19.449  WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:19.449
2017-02-03 09:59:19.982  WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:19.982
2017-02-03 09:59:19.982  WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:19.982
2017-02-03 09:59:20.018  WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:20.018
2017-02-03 09:59:20.018  WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:20.018
2017

我原以为每条消息都会遵循定义的周期:源-处理器-接收器。我们可以看到至少有 3 条消息中的 2 条丢失了,并且只有 4 条消息中的 1 条被转换了。 注意:通道目的地是在第二次尝试中添加的,以避免应用程序之间的假定混淆(使用相同的 RabbitMQ 中间件)。

谁能告诉我我是否正确地理解了聚合目的并做了正确的事情来实现它?提前致谢。

【问题讨论】:

【参考方案1】:

有几点:

您不应为您的应用程序指定目标,因为作为聚合的一部分的应用程序通过内部通道进行通信,而不是通过代理; 其次(不幸的是,这是我们的文档未指定的内容)- 在每个聚合组件定义上使用 @SpringBootApplication 时,聚合的不同部分必须属于不同的 Java 包。在您的情况下发生的情况是,您在各种渠道上获得了多个消费者,而不是您期望的链接。将 Source、Transformer 和 Sink 移动到单独的包中应该适合您。此外,添加了https://github.com/spring-cloud/spring-cloud-stream/issues/785 以跟踪此问题。

【讨论】:

好的,我明白了...谢谢。那么我可以在我的聚合组件中使用 Configuration 或 Component 注释,而不是 SpringBootApplication 吗? 在这种特殊情况下,您可以使用 @Configuration - 该机制还设计用于处理同一个 JAR 中的应用程序 - 所以这取决于。 哎呀。我的意思是“应用程序存在于单独的 JAR 中”。 再次感谢您的提示。我还有另一个问题,您是否也可以说明一下。当我谈到这个主题时,我很自然地期待这种聚合模式可以在整个网络中使用。但随着管道被内部化,事实并非如此。 Spring Integration 是否足以在流组件上设计复合材料?那你能分享一些例子吗? 只是为了让大家知道:配置以及组件注释对我有用。我猜这些流组件只需要一些用于聚合的 bean(以及更多?)

以上是关于Spring Cloud Stream - 聚合的主要内容,如果未能解决你的问题,请参考以下文章

spring-cloud-stream kafka 消费者并发

Spring Cloud Stream @SendTo 注解不起作用

Spring Cloud (十五)Stream 入门主要概念与自定义消息发送与接收

spring-cloud-stream 请求-回复消息模式

spring cloud stream

spring cloud-stream 和 spring cloud-bus 有啥区别?