Spring Integration JMS 创建 ActiveMQ 队列而不是主题
Posted
技术标签:
【中文标题】Spring Integration JMS 创建 ActiveMQ 队列而不是主题【英文标题】:Spring Integration JMS creating ActiveMQ queue instead of topic 【发布时间】:2018-08-03 11:39:12 【问题描述】:我正在尝试使用 ActiveMQ 代理向两个侦听自动主题的消费者传递消息,并使用 Spring 集成工具。
这是我的配置 bean(发布者和订阅者共有):
@Value("$spring.activemq.broker-url")
String brokerUrl;
@Value("$spring.activemq.user")
String userName;
@Value("$spring.activemq.password")
String password;
@Bean
public ConnectionFactory connectionFactory()
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(brokerUrl);
connectionFactory.setUserName(userName);
connectionFactory.setPassword(password);
return connectionFactory;
@Bean
public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer)
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true); //!!
configurer.configure(factory, connectionFactory);
return factory;
@Bean
public JmsTemplate jmsTemplate()
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setPubSubDomain(true); //!!
return template;
以下是供消费者使用的 bean:
@Bean(name = "jmsInputChannel")
public MessageChannel jmsInputChannel()
return new PublishSubscribeChannel();
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow()
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination("myTopic"))
.channel("jmsInputChannel").get();
//Consumes the message.
@ServiceActivator(inputChannel="jmsInputChannel")
public void receive(String msg)
System.out.println("Received Message: " + msg);
这些是生产者的 bean:
@Bean(name = "jmsOutputChannel")
public MessageChannel jmsOutputChannel()
return new PublishSubscribeChannel();
@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow()
return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(connectionFactory())
.destination("myTopic")
).get();
private static int counter = 1;
@Scheduled(initialDelay=5000, fixedDelay=2000)
public void send()
String s = "Message number " + counter;
counter++;
jmsOutputChannel().send(MessageBuilder.withPayload(s).build());
我没有使用嵌入式 ActiveMQ 代理。我在自己的(docker)容器中使用了一个代理、一个生产者和两个消费者。
我的问题是,虽然我在 JmsListenerContainerFactory
和 JmsTemplate
上都调用了 setPubSubDomain(true)
,但我的“主题”表现为队列:一个消费者打印所有偶数编号的消息,而另一个打印所有奇数的。
事实上,通过访问ActiveMQ Web界面,我看到我的“主题”(即在/topics.jsp页面下)被命名为ActiveMQ.Advisory.Consumer.Queue.myTopic
和ActiveMQ.Advisory.Producer.Queue.myTopic
,并且“myTopic”确实出现在队列页面中(即 /queues.jsp)。
节点按以下顺序启动:
AMQ 代理 消费者1 消费者2 制片人创建的第一个“主题”是ActiveMQ.Advisory.Consumer.Queue.myTopic
,而生产者主题显然仅在生产者启动后出现。
我不是 ActiveMQ 方面的专家,所以我的生产者/消费者“主题”被命名为“.Queue”这一事实可能只是一种误导。但是,我确实得到了official ActiveMQ documentation 中描述的队列语义,而不是主题。
我也已经看过this question,但是我使用的所有频道都已经属于 PublishSubscribeChannel 类型。
我需要实现的是将所有消息传递给我的所有(可能 > 2 个)消费者。
更新:我忘了说,我的 application.properties
文件已经包含 spring.jms.pub-sub-domain=true
以及其他设置。
另外,我使用的 Spring Integration 版本是 4.3.12.RELEASE。
问题是,我仍然得到 RR 负载平衡语义,而不是发布-订阅语义。
至于我在@Hassen Bennour 提供的link 中看到的内容,我希望在所有主题列表中获得ActiveMQ.Advisory.Producer.Topic.myTopic
和ActiveMQ.Advisory.Consumer.Topic.myTopic
行。不知何故,我认为我没有很好地使用 Spring Integration 库,因此当我想设置一个主题时,我正在设置一个队列。
更新 2:很抱歉造成混乱。 jmsOutputChannel2
其实就是jmsOutputChannel
这里,我已经编辑了主要部分。我在我的代码中使用了第二个“主题”作为检查,生产者可以自己发送消息并接收回复。 “主题”名称也不同,所以......它完全在一个单独的流程中。
通过以这种方式更改接收器流,我确实取得了一点进展:
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow()
//return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination("myTopic"))
//.channel("jmsInputChannel").get();
return IntegrationFlows.from(Jms.publishSubscribeChannel(connectionFactory()).destination("myTopic")) //Jms.publishSubscribeChannel() rather than Jms.messageDrivenChannelAdapter()
.channel("jmsInputChannel").get();
这会在代理上生成Consumer.Topic.myTopic
类型的咨询主题而不是Consumer.Queue.myTopic
,并且实际上是一个名为myTopic
的主题(正如我从主题选项卡中看到的那样)。但是,一旦生产者启动,就会创建一个Producer.Queue
咨询主题,并且消息会在那里发送而不被传递。
输入流中适配器的选择似乎决定了创建什么样的咨询消费者主题(从Jms.messageDrivenChannelAdapter()
切换到Jms.publishSubscribeChannel()
时的主题与队列)。但是,我还没有找到与输出流类似的东西。
更新 3:感谢@Hassen Bennour,问题已解决。回顾:
我在制作人的Jms.outboundAdapter()
中连接了jmsTemplate()
@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow()
return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(jsaTemplate())
.destination("myTopic")
).get();
还有一个更复杂的消费者配置Jms.messageDrivenChannelAdapter()
:
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow()
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
Jms.container(connectionFactory(),"myTopic")
.pubSubDomain(true).get()) )
.channel("jmsInputChannel").get();
虽然这个可能是最流畅、最灵活的方法了,有这样一个bean...
@Bean
public Topic topic()
return new ActiveMQTopic("myTopic");
作为适配器的目的地,而不仅仅是一个字符串。
再次感谢。
【问题讨论】:
jmsOutputChannel2().send() 发送消息到 jmsOutputChannel() ?? “2”是我这边报告的错误。事实上,我只是直接在消息通道jmsOutputChannel()
上发送字符串,它通过IntegrationFlow 链接到Jms.outboundAdapter()
。
【参考方案1】:
将 spring.jms.pub-sub-domain=true 添加到 application.properties
或
@Bean
public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer)
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// the configurer will use PubSubDomain from application.properties if defined or false if not
//so setting it on the factory level need to be set after this
configurer.configure(factory, connectionFactory);
factory.setPubSubDomain(true);
return factory;
ActiveMQ.Advisory.Consumer.Queue.myTopic
是名为 myTopic 的队列的咨询主题
看看这里了解咨询
http://activemq.apache.org/advisory-message.html
更新:
如下所示更新您的定义
@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow()
return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(jmsTemplate())
.destination("myTopic")
).get();
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow()
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
Jms.container(connectionFactory(),"myTopic")
.pubSubDomain(true).get()) )
.channel("jmsInputChannel").get();
或将 Destination 定义为主题,并将 destination("myTopic") 替换为 destination(topic())
@Bean
public Topic topic()
return new ActiveMQTopic("myTopic");
【讨论】:
属性已经设置好了,我忘了说。我还尝试在调用配置器后移动factory.setPubSubDomain(true)
行。我有完全相同相同的行为。以上是关于Spring Integration JMS 创建 ActiveMQ 队列而不是主题的主要内容,如果未能解决你的问题,请参考以下文章
Spring Integration DSL JMS 入站/出站网关
Spring Integration JMS Inbound Gateway 回复通道没有订阅者
在 java 1.7 中将 JMS 出站通道适配器转换为等效的 Spring Integration DSL