有啥方法可以拆分通量流吗?
Posted
技术标签:
【中文标题】有啥方法可以拆分通量流吗?【英文标题】:Is there any way to split flux stream?有什么方法可以拆分通量流吗? 【发布时间】:2020-01-10 13:48:36 【问题描述】:我正在尝试以多种方式分析数据(来自 RabbitMQ / spring cloud stream reactive)。 我需要找到一种方法将测量通量分成多个“汇”。 例如,我想做十秒钟的数据窗口,然后找到最大和最小的测量值。或者检查测量值是否在安全范围内,如果不在 - 打开警报(或发送电子邮件)。
我的项目: https://github.com/Stiuil06/GreenRealTime/blob/master/grl-analysis/src/main/java/com/arturwegrzyn/grl/AnalysisApplication.java
我尝试了两种方法:
@StreamListener
public void receive1(@Input(AnalysisChannels.INPUT) Flux<String> measurements)
measurements
.map(json -> gson.fromJson(json, Measurement.class))
.filter(m -> m instanceof WaterLevel)
.subscribe(m -> System.out.println(m));
@StreamListener
public void receive2(@Input(AnalysisChannels.INPUT) Flux<String> measurements)
measurements
.map(json -> gson.fromJson(json, Measurement.class))
.filter(m -> m instanceof WaterLevel)
.subscribe(m -> System.out.println(m));
在这种情况下,一个事件只执行一个侦听器(随机先执行一次,然后执行一次)
@StreamListener
public void receive2(@Input(AnalysisChannels.INPUT) Flux<String> measurements)
System.out.println("xyz");
ConnectableFlux<String> publish = measurements.publish();
publish
.map(json -> gson.fromJson(json, Measurement.class))
.filter(m -> m instanceof AirTemperature)
.subscribe(m -> System.out.println(m));
publish
.map(json -> gson.fromJson(json, Measurement.class))
.filter(m -> m instanceof AirTemperature)
.subscribe(m -> System.out.println(m));
在第二种情况下我得到一个异常
2019-09-08 16:43:12.720 ERROR 12972 --- [nalysis-group-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.dataAnalysis'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[145], headers=amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=dataAnalysis, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=dataAnalysis.realtime-analysis-group, amqp_redelivered=false, mqtt_receivedRetained=false, amqp_receivedRoutingKey=dataAnalysis, mqtt_duplicate=false, amqp_timestamp=Sun Sep 08 16:40:49 CEST 2019, amqp_messageId=e229ef37-4672-c524-e3bb-a04e607bb9cb, id=90ff1479-e363-e13e-ead7-aa7a64aaf612, amqp_consumerTag=amq.ctag-xcdhmSOud5ZYJquDKUgsiw, contentType=application/json, mqtt_receivedTopic=/measurement/si:mu:la:00/AirPressure, mqtt_receivedQos=1, timestamp=1567953789712], failedMessage=GenericMessage [payload=byte[145], headers=amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=dataAnalysis, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=dataAnalysis.realtime-analysis-group, amqp_redelivered=false, mqtt_receivedRetained=false, amqp_receivedRoutingKey=dataAnalysis, mqtt_duplicate=false, amqp_timestamp=Sun Sep 08 16:40:49 CEST 2019, amqp_messageId=e229ef37-4672-c524-e3bb-a04e607bb9cb, id=90ff1479-e363-e13e-ead7-aa7a64aaf612, amqp_consumerTag=amq.ctag-xcdhmSOud5ZYJquDKUgsiw, contentType=application/json, mqtt_receivedTopic=/measurement/si:mu:la:00/AirPressure, mqtt_receivedQos=1, timestamp=1567953789712]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1200(AmqpInboundChannelAdapter.java:57)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:223)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:220)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1542)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1468)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1456)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1451)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1400)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043)
at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[145], headers=amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=dataAnalysis, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=dataAnalysis.realtime-analysis-group, amqp_redelivered=false, mqtt_receivedRetained=false, amqp_receivedRoutingKey=dataAnalysis, mqtt_duplicate=false, amqp_timestamp=Sun Sep 08 16:40:49 CEST 2019, amqp_messageId=e229ef37-4672-c524-e3bb-a04e607bb9cb, id=90ff1479-e363-e13e-ead7-aa7a64aaf612, amqp_consumerTag=amq.ctag-xcdhmSOud5ZYJquDKUgsiw, contentType=application/json, mqtt_receivedTopic=/measurement/si:mu:la:00/AirPressure, mqtt_receivedQos=1, timestamp=1567953789712]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
... 23 more
通量/反应流拆分应该是什么样子?或者也许我应该以不同的方式处理问题?
【问题讨论】:
【参考方案1】:快速说明:
我们将在 3.0 中一起弃用响应式模块,因为已经使用 spring-cloud-function 编程模型处理了相同的模块。这是您的代码的外观。请注意,不再需要 EnableBinding、StreamListener 和其他注释。只需@Bean
,我们就会
@Bean
public Consumer<Flux<String>> sink()
return measurements ->
ConnectableFlux<String> publish = measurements.publish();
publish.connect();
...
;
这里有更多 - https://spring.io/blog/2019/08/19/announcing-spring-cloud-stream-horsham-m3-3-0-0-m3
【讨论】:
【参考方案2】:我启动了第二种情况中显示的代码。
那里缺少connect()
。
它应该看起来像:
public void receive2(@Input(AnalysisChannels.INPUT) Flux<String> measurements)
ConnectableFlux<String> publish = measurements.publish();
publish.connect();
publish
.map(json -> gson.fromJson(json, Measurement.class))
.filter(m -> m instanceof AirTemperature)
.subscribe(m -> System.out.println(m));
publish
.map(json -> gson.fromJson(json, Measurement.class))
.filter(m -> m instanceof AirTemperature)
.subscribe(m -> System.out.println(m));
无论如何,如果有人对我的逻辑有更好的解决方案,请随时写出来! 感谢您的帮助。
【讨论】:
好吧,看起来这不是关于拆分,而是发布-订阅,将相同的消息分发给多个订阅者。如果这确实是一个目标,那么您在答案中的解决方案很好。 您不能在同一输入上使用多个@StreamListener
方法来尝试实现您的目标:它们将成为相互竞争的订阅者,并以循环方式获取消息。
@ArtemBilan 你知道如何在 bean 之类的东西中捕捉到这个 ConnectableFlux 吗?我想从应用程序周围订阅它。我不想在 StreamListener 方法中弄得一团糟。
您可以考虑使用IbntegrationFlow
而不是这个@StreamListener
并使用它的toReactivePublisher()
将流真正公开为Publsiher<Message<?>>
bean。此时,您可以将其包装成 Flux
,并在应用程序中的所有其他位置为您的所有 subscribe()
调用它的 publish()
。以上是关于有啥方法可以拆分通量流吗?的主要内容,如果未能解决你的问题,请参考以下文章
色温与光通量有啥关系?为啥荧光灯色温越高光通量越低,成反比?LED色温越高光通量越高,成正比?