Spring Integration Dispatcher 没有频道订阅者
Posted
技术标签:
【中文标题】Spring Integration Dispatcher 没有频道订阅者【英文标题】:Spring Integration Dispatcher has no subscribers for channel 【发布时间】:2017-05-05 12:08:48 【问题描述】:我正在使用 Spring 集成及其对 MQTT 的支持;我看到了 spring 集成文档,我的简单测试用例是在 MQTT 主题上发布消息。 Spring 文档位于:http://docs.spring.io/spring-integration/reference/html/mqtt.html#_configuring_with_java_configuration_15
我正在使用这些版本:
春天 4.3.4 spring 集成 4.3.5我构建了这个简单的配置类:
@Configuration
@IntegrationComponentScan
public class CommunicationServerApplication
@Bean
public MqttPahoClientFactory mqttClientFactory()
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(mqttServerUris);
if (StringUtils.hasText(mqttUsername) && StringUtils.hasText(mqttPassword))
factory.setUserName(mqttUsername);
factory.setPassword(mqttPassword);
factory.setConnectionTimeout(mqttConnectionTimeout);
factory.setKeepAliveInterval(mqttKeepAliveInterval);
factory.setPersistence(new MqttDefaultFilePersistence(mqttPersistenceFileDirectory));
return factory;
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel", autoStartup="true")
public MessageHandler mqttOutbound()
String clientId = mqttClientId;
if( !StringUtils.hasText(clientId) )
clientId = UUID.randomUUID().toString();
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttTopic);
if( mqttQos >= 0 && mqttQos <=2 )
messageHandler.setDefaultQos(mqttQos);
return messageHandler;
@Bean
public MessageChannel mqttOutboundChannel()
DirectChannel dc = new DirectChannel();
return dc;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttMsgproducer
void sendToMqtt(String data);
然后我使用了这个简单的测试用例:
@ContextConfiguration(value = "classpath:app-ctx.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class SimpleMqttTestSuite
private static final Logger logger = LoggerFactory.getLogger(SimpleMqttTestSuite.class.getName());
@Autowired
private MqttMsgproducer sender;
@Test
public void startServerTest()
try
sender.sendToMqtt("Hello");
catch (Exception e)
logger.error("Error", e);
我的 app-ctx.xml 是:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:oxm="http://www.springframework.org/schema/oxm"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">
<context:component-scan base-package="it.olegna.test.integration" />
<context:property-placeholder location="classpath:configuration.properties"
order="0" ignore-resource-not-found="true" ignore-unresolvable="true" />
</beans>
执行简单的测试,我遇到了这个错误:
2016-12-20 10:46:33,889 49967 [nioEventLoopGroup-3-1] ERROR - Errore
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'org.springframework.context.support.GenericApplicationContext@2e6a8155.mqttOutboundChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:135) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:375) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:477) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:429) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:420) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.integration.gateway.GatewayCompletableFutureProxyFactoryBean.invoke(GatewayCompletableFutureProxyFactoryBean.java:65) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) ~[spring-aop-4.3.4.RELEASE.jar:4.3.4.RELEASE]
我无法弄清楚我在配置中缺少什么。有人可以给我小费吗?
谢谢
安杰洛
【问题讨论】:
不是您问题的答案,但可能对一些在标题中搜索错误消息的人有用:我在遵循 Spring 5 文档中的 MQTT 示例时遇到了类似的错误。我必须用adapter.setOutputChannelName("mqttInputChannel")
替换adapter.setOutputChannel(mqttInputChannel())
才能使其工作。
【参考方案1】:
您的解决方案不正确 - 您不得在频道 bean 定义中订阅。我相信您的问题是您在课堂上缺少@EnableIntegration
。
【讨论】:
我希望得到一些澄清 :) 我按照你的建议进行了尝试,它有效 @EnableIntegration 修复与在通道 bean 定义中订阅无关 订阅侧 bean 定义的最佳方法是什么? 从用户代码订阅频道是不寻常的(罕见的);通常,框架会在消费者启动时自动订阅消费者(通常是自动的)。 @GaryRussell EnableIntegration Annotation 是否需要转到定义我的频道的位置?我有时仍然收到调度程序错误以及link 这个错误,在我看来它们是相关的。【参考方案2】:我解决了我的问题
这与我建立了 Channel 但现在处理程序已被订阅的事实有关
在我的应用程序类中,我执行了以下操作:
@Bean
public MessageChannel mqttOutboundChannel()
DirectChannel dc = new DirectChannel();
dc.subscribe(mqttOutbound());
return dc;
正如您现在所看到的,我手动将订阅 bean mqttOutbound(消息处理程序)添加到 Channel
通过这种方式,一切正常
希望对你有帮助
安杰洛
加里·罗素回答后更新
按照 Gary Russell 的建议,我没有订阅频道
我添加了注解@EnableIntegration
所以我的应用程序类现在如下:
@Configuration
@IntegrationComponentScan
@EnableIntegration
public class CommunicationServerApplication
@Bean
public MqttPahoClientFactory mqttClientFactory()
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(mqttServerUris);
if (StringUtils.hasText(mqttUsername) && StringUtils.hasText(mqttPassword))
factory.setUserName(mqttUsername);
factory.setPassword(mqttPassword);
factory.setConnectionTimeout(mqttConnectionTimeout);
factory.setKeepAliveInterval(mqttKeepAliveInterval);
factory.setPersistence(new MqttDefaultFilePersistence(mqttPersistenceFileDirectory));
return factory;
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel", autoStartup="true")
public MessageHandler mqttOutbound()
String clientId = mqttClientId;
if( !StringUtils.hasText(clientId) )
clientId = UUID.randomUUID().toString();
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttTopic);
if( mqttQos >= 0 && mqttQos <=2 )
messageHandler.setDefaultQos(mqttQos);
return messageHandler;
@Bean
public MessageChannel mqttOutboundChannel()
DirectChannel dc = new DirectChannel();
return dc;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttMsgproducer
void sendToMqtt(String data);
【讨论】:
以上是关于Spring Integration Dispatcher 没有频道订阅者的主要内容,如果未能解决你的问题,请参考以下文章
INTEGRATION TESTING WITH SPRING AND JUNIT
Spring Integration Dispatcher 没有频道订阅者
使用 Spring Integration HttpInbound 示例
spring-integration-file 的 junit 测试用例