Spring Integration JMS 创建 ActiveMQ 队列而不是主题

Posted

技术标签:

【中文标题】Spring Integration JMS 创建 ActiveMQ 队列而不是主题【英文标题】:Spring Integration JMS creating ActiveMQ queue instead of topic 【发布时间】:2018-08-03 11:39:12 【问题描述】:

我正在尝试使用 ActiveMQ 代理向两个侦听自动主题的消费者传递消息,并使用 Spring 集成工具。

这是我的配置 bean(发布者和订阅者共有):

@Value("$spring.activemq.broker-url")
String brokerUrl;

@Value("$spring.activemq.user")
String userName;

@Value("$spring.activemq.password")
String password;

@Bean
public ConnectionFactory connectionFactory() 
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    connectionFactory.setBrokerURL(brokerUrl);
    connectionFactory.setUserName(userName);
    connectionFactory.setPassword(password);
    return connectionFactory;


@Bean
public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setPubSubDomain(true); //!!
    configurer.configure(factory, connectionFactory);
    return factory;


@Bean
public JmsTemplate jmsTemplate() 
    JmsTemplate template = new JmsTemplate();
    template.setConnectionFactory(connectionFactory());
    template.setPubSubDomain(true); //!!
    return template;

以下是供消费者使用的 bean:

@Bean(name = "jmsInputChannel")
public MessageChannel jmsInputChannel() 
    return new PublishSubscribeChannel();


@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow()         
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination("myTopic"))
            .channel("jmsInputChannel").get();


//Consumes the message.
@ServiceActivator(inputChannel="jmsInputChannel")
public void receive(String msg)
    System.out.println("Received Message: " + msg);

这些是生产者的 bean:

@Bean(name = "jmsOutputChannel")
public MessageChannel jmsOutputChannel() 
    return new PublishSubscribeChannel();


@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() 
    return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(connectionFactory())
            .destination("myTopic")
    ).get();




private static int counter = 1;

@Scheduled(initialDelay=5000, fixedDelay=2000)
public void send() 
     String s = "Message number " + counter;
     counter++;
     jmsOutputChannel().send(MessageBuilder.withPayload(s).build());

我没有使用嵌入式 ActiveMQ 代理。我在自己的(docker)容器中使用了一个代理、一个生产者和两个消费者。

我的问题是,虽然我在 JmsListenerContainerFactoryJmsTemplate 上都调用了 setPubSubDomain(true),但我的“主题”表现为队列:一个消费者打印所有偶数编号的消息,而另一个打印所有奇数的。

事实上,通过访问ActiveMQ Web界面,我看到我的“主题”(即在/topics.jsp页面下)被命名为ActiveMQ.Advisory.Consumer.Queue.myTopicActiveMQ.Advisory.Producer.Queue.myTopic,并且“myTopic”确实出现在队列页面中(即 /queues.jsp)。

节点按以下顺序启动:

AMQ 代理 消费者1 消费者2 制片人

创建的第一个“主题”是ActiveMQ.Advisory.Consumer.Queue.myTopic,而生产者主题显然仅在生产者启动后出现。

我不是 ActiveMQ 方面的专家,所以我的生产者/消费者“主题”被命名为“.Queue”这一事实可能只是一种误导。但是,我确实得到了official ActiveMQ documentation 中描述的队列语义,而不是主题。

我也已经看过this question,但是我使用的所有频道都已经属于 PublishSubscribeChannel 类型。

我需要实现的是将所有消息传递给我的所有(可能 > 2 个)消费者。

更新:我忘了说,我的 application.properties 文件已经包含 spring.jms.pub-sub-domain=true 以及其他设置。

另外,我使用的 Spring Integration 版本是 4.3.12.RELEASE。

问题是,我仍然得到 RR 负载平衡语义,而不是发布-订阅语义。 至于我在@Hassen Bennour 提供的link 中看到的内容,我希望在所有主题列表中获得ActiveMQ.Advisory.Producer.Topic.myTopicActiveMQ.Advisory.Consumer.Topic.myTopic 行。不知何故,我认为我没有很好地使用 Spring Integration 库,因此当我想设置一个主题时,我正在设置一个队列。

更新 2:很抱歉造成混乱。 jmsOutputChannel2其实就是jmsOutputChannel这里,我已经编辑了主要部分。我在我的代码中使用了第二个“主题”作为检查,生产者可以自己发送消息并接收回复。 “主题”名称也不同,所以......它完全在一个单独的流程中。

通过以这种方式更改接收器流,我确实取得了一点进展:

@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow()         
    //return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination("myTopic"))
            //.channel("jmsInputChannel").get();
return IntegrationFlows.from(Jms.publishSubscribeChannel(connectionFactory()).destination("myTopic")) //Jms.publishSubscribeChannel() rather than Jms.messageDrivenChannelAdapter()
            .channel("jmsInputChannel").get();

这会在代理上生成Consumer.Topic.myTopic 类型的咨询主题而不是Consumer.Queue.myTopic,并且实际上是一个名为myTopic 的主题(正如我从主题选项卡中看到的那样)。但是,一旦生产者启动,就会创建一个Producer.Queue 咨询主题,并且消息会在那里发送而不被传递。

输入流中适配器的选择似乎决定了创建什么样的咨询消费者主题(从Jms.messageDrivenChannelAdapter() 切换到Jms.publishSubscribeChannel() 时的主题与队列)。但是,我还没有找到与输出流类似的东西。

更新 3:感谢@Hassen Bennour,问题已解决。回顾:

我在制作人的Jms.outboundAdapter() 中连接了jmsTemplate()

@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() 
    return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(jsaTemplate())
            .destination("myTopic")
    ).get();

还有一个更复杂的消费者配置Jms.messageDrivenChannelAdapter()

@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow()         
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
        Jms.container(connectionFactory(),"myTopic")
        .pubSubDomain(true).get()) )
        .channel("jmsInputChannel").get();

虽然这个可能是最流畅、最灵活的方法了,有这样一个bean...

@Bean
public Topic topic() 
    return new ActiveMQTopic("myTopic");

作为适配器的目的地,而不仅仅是一个字符串。

再次感谢。

【问题讨论】:

jmsOutputChannel2().send() 发送消息到 jmsOutputChannel() ?? “2”是我这边报告的错误。事实上,我只是直接在消息通道jmsOutputChannel() 上发送字符串,它通过IntegrationFlow 链接到Jms.outboundAdapter() 【参考方案1】:

将 spring.jms.pub-sub-domain=true 添加到 application.properties

@Bean
public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    // the configurer will use PubSubDomain from application.properties if defined or false if not
    //so setting it on the factory level need to be set after this
    configurer.configure(factory, connectionFactory);
    factory.setPubSubDomain(true);
    return factory;

ActiveMQ.Advisory.Consumer.Queue.myTopic 是名为 myTopic 的队列的咨询主题 看看这里了解咨询 http://activemq.apache.org/advisory-message.html

更新:

如下所示更新您的定义

@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() 
    return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(jmsTemplate())
            .destination("myTopic")
    ).get();


@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow()         
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
            Jms.container(connectionFactory(),"myTopic")
            .pubSubDomain(true).get()) )
            .channel("jmsInputChannel").get();

或将 Destination 定义为主题,并将 destination("myTopic") 替换为 destination(topic())

@Bean
public Topic topic() 
    return new ActiveMQTopic("myTopic");

【讨论】:

属性已经设置好了,我忘了说。我还尝试在调用配置器后移动factory.setPubSubDomain(true) 行。我有完全相同相同的行为。

以上是关于Spring Integration JMS 创建 ActiveMQ 队列而不是主题的主要内容,如果未能解决你的问题,请参考以下文章

Spring Integration DSL JMS 入站/出站网关

Spring Integration JMS Inbound Gateway 回复通道没有订阅者

在 java 1.7 中将 JMS 出站通道适配器转换为等效的 Spring Integration DSL

Spring Integration DSL 中的路由

Spring Integration的DefaultMessageListenerContainer和JPA中的事务

Spring-integration / ActiveMQ 在单个线程中订阅多个目的地