Spring Cloud Stream - 聚合
Posted
技术标签:
【中文标题】Spring Cloud Stream - 聚合【英文标题】:Spring Cloud Stream - Aggregates 【发布时间】:2017-06-20 15:01:15 【问题描述】:我正在尝试实施建议的 SCS 聚合,但我不确定它们的真正目的,因为我得到的结果让我感到惊讶。 首先,这是代码...
源,要调度的消息提供者:
@SpringBootApplication
@EnableBinding(Source.class)
public class SourceApplication
private final Logger logger = LoggerFactory.getLogger(SourceApplication.class);
@Bean
@InboundChannelAdapter(Source.OUTPUT)
public MessageSource<String> createMessage()
return () ->
String payload = now().toString();
logger.warn("Sent: " + payload);
return new GenericMessage<>(payload);
;
然后是一个简单的处理器(变压器):
@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApplication
@Transformer(inputChannel = Processor.INPUT,
outputChannel = Processor.OUTPUT)
public String processMessage(String payload)
return payload + " is the time.";
这是最终消费者(接收器):
@SpringBootApplication
@EnableBinding(Sink.class)
public class SinkApplication
private final Logger logger = LoggerFactory.getLogger(SinkApplication.class);
@ServiceActivator(inputChannel = Sink.INPUT)
public void loggerSink(Object payload)
logger.warn("Received: " + payload);
最后,聚合链接了这三个:
@SpringBootApplication
public class SampleAggregateApplication
public static void main(String[] args)
new AggregateApplicationBuilder().web(false)
.from(SourceApplication.class)
.args("--spring.cloud.stream.bindings.output.destination=step1", "--fixedDelay=5000")
.via(ProcessorApplication.class)
.args("--spring.cloud.stream.bindings.input.destination=step1",
"--spring.cloud.stream.bindings.output.destination=step2")
.to(SinkApplication.class)
.args("--spring.cloud.stream.bindings.input.destination=step2")
.run(args);
当聚合启动时,这些是检索到的跟踪的摘录:
2017-02-03 09:59:13.428 WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:13.428
2017-02-03 09:59:13.949 WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:13.949
2017-02-03 09:59:13.949 WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:13.949
2017-02-03 09:59:13.996 WARN 18688 --- [ask-scheduler-9] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:13.996
2017-02-03 09:59:14.430 WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:14.430
2017-02-03 09:59:14.956 WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:14.956
2017-02-03 09:59:14.956 WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:14.956 is the time.
2017-02-03 09:59:14.999 WARN 18688 --- [ask-scheduler-5] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:14.999
2017-02-03 09:59:15.432 WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:15.432
2017-02-03 09:59:15.961 WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:15.961
2017-02-03 09:59:15.961 WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:15.961
2017-02-03 09:59:16.000 WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:16
2017-02-03 09:59:16.001 WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:16
2017-02-03 09:59:16.437 WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:16.437
2017-02-03 09:59:16.966 WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:16.966
2017-02-03 09:59:17.006 WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:17.006
2017-02-03 09:59:17.006 WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:17.006
2017-02-03 09:59:17.443 WARN 18688 --- [ask-scheduler-9] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:17.443
2017-02-03 09:59:17.971 WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:17.971
2017-02-03 09:59:17.971 WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:17.971
2017-02-03 09:59:18.007 WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:18.007
2017-02-03 09:59:18.448 WARN 18688 --- [ask-scheduler-5] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:18.448
2017-02-03 09:59:18.976 WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:18.976
2017-02-03 09:59:18.976 WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:18.976 is the time.
2017-02-03 09:59:19.012 WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:19.012
2017-02-03 09:59:19.449 WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:19.449
2017-02-03 09:59:19.982 WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:19.982
2017-02-03 09:59:19.982 WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:19.982
2017-02-03 09:59:20.018 WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:20.018
2017-02-03 09:59:20.018 WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:20.018
2017
我原以为每条消息都会遵循定义的周期:源-处理器-接收器。我们可以看到至少有 3 条消息中的 2 条丢失了,并且只有 4 条消息中的 1 条被转换了。 注意:通道目的地是在第二次尝试中添加的,以避免应用程序之间的假定混淆(使用相同的 RabbitMQ 中间件)。
谁能告诉我我是否正确地理解了聚合目的并做了正确的事情来实现它?提前致谢。
【问题讨论】:
【参考方案1】:有几点:
您不应为您的应用程序指定目标,因为作为聚合的一部分的应用程序通过内部通道进行通信,而不是通过代理; 其次(不幸的是,这是我们的文档未指定的内容)- 在每个聚合组件定义上使用@SpringBootApplication
时,聚合的不同部分必须属于不同的 Java 包。在您的情况下发生的情况是,您在各种渠道上获得了多个消费者,而不是您期望的链接。将 Source、Transformer 和 Sink 移动到单独的包中应该适合您。此外,添加了https://github.com/spring-cloud/spring-cloud-stream/issues/785 以跟踪此问题。
【讨论】:
好的,我明白了...谢谢。那么我可以在我的聚合组件中使用 Configuration 或 Component 注释,而不是 SpringBootApplication 吗? 在这种特殊情况下,您可以使用@Configuration
- 该机制还设计用于处理同一个 JAR 中的应用程序 - 所以这取决于。
哎呀。我的意思是“应用程序存在于单独的 JAR 中”。
再次感谢您的提示。我还有另一个问题,您是否也可以说明一下。当我谈到这个主题时,我很自然地期待这种聚合模式可以在整个网络中使用。但随着管道被内部化,事实并非如此。 Spring Integration 是否足以在流组件上设计复合材料?那你能分享一些例子吗?
只是为了让大家知道:配置以及组件注释对我有用。我猜这些流组件只需要一些用于聚合的 bean(以及更多?)以上是关于Spring Cloud Stream - 聚合的主要内容,如果未能解决你的问题,请参考以下文章
spring-cloud-stream kafka 消费者并发
Spring Cloud Stream @SendTo 注解不起作用