如何使用spring boot jms收听话题

Posted

技术标签:

【中文标题】如何使用spring boot jms收听话题【英文标题】:How to listen to topic using spring boot jms 【发布时间】:2017-03-01 20:54:29 【问题描述】:

我正在尝试使用以下 sn-p 收听主题。但是默认情况下它会监听队列。在这种情况下没有 xml 配置。我完全依赖注释。此外,我完全依赖 Spring boot 提供的 AutoConfiguration。我不确定如何在 JmsListener 中将目标类型设置为主题。请 Spring JMS 专家帮忙。

    @Component
    public class MyTopicListener 

        @JmsListener(destination = "$trans.alert.topic")
        public void receiveMessage(TransactionAlert alert) 
            logger.info("AlertSubscriberEmail :: Sending Email => <" + alert + ">");
        
    

【问题讨论】:

【参考方案1】:

标记为正确的答案几乎是正确的。它仍然无法工作,因为:

factory.setPubSubDomain(true) 

必须在之后:

configurer.configure(factory, connectionFactory);

否则,设置为 true 的 pubSubDomain 标志会在配置默认值时丢失,并且该工厂实例仍可用于队列而不是主题。

【讨论】:

谢谢你。这让我发疯了几个小时。【参考方案2】:

我刚刚从https://github.com/spring-guides/gs-messaging-jms/获取了完整的 Spring boot 示例

在这里,它是为发送和接收来自队列的消息而创建的。要将其更改为 topic ,您必须在 Factory 实例中设置 Pub-Sub 属性。

import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

import javax.jms.ConnectionFactory;

@SpringBootApplication
@EnableJms
public class JmsSampleApplication 

public void registerBeans(ConfigurableApplicationContext context )
    BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(JmsTemplate.class);
    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();

    builder.addPropertyValue("connectionFactory", cachingConnectionFactory);      // set property value
    DefaultListableBeanFactory factory = (DefaultListableBeanFactory) context.getAutowireCapableBeanFactory();
    factory.registerBeanDefinition("jmsTemplateName", builder.getBeanDefinition());


@Bean
public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory,
                                                DefaultJmsListenerContainerFactoryConfigurer configurer) 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setPubSubDomain(true);
    // This provides all boot's default to this factory, including the message converter
    configurer.configure(factory, connectionFactory);
    // You could still override some of Boot's default if necessary.
    return factory;


@Bean
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory,
                                                           DefaultJmsListenerContainerFactoryConfigurer configurer) 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    //factory.setPubSubDomain(true);
    // This provides all boot's default to this factory, including the message converter
    configurer.configure(factory, connectionFactory);
    return factory;


@Bean // Serialize message content to json using TextMessage
public MessageConverter jacksonJmsMessageConverter() 
    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    converter.setTargetType(MessageType.TEXT);
    converter.setTypeIdPropertyName("_type");
    return converter;

public static void main(String[] args) 
    ConfigurableApplicationContext context = SpringApplication.run(JmsSampleApplication.class, args);

    JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);

    // Send a message with a POJO - the template reuse the message converter
    System.out.println("Sending an email message.");
    jmsTemplate.convertAndSend("mailbox.topic", new Email("info@example.com", "Hello"));
    jmsTemplate.convertAndSend("mailbox.queue", new Email("info@example.com", "Hello"));

    

听者

package org.springboot.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * Created by RGOVIND on 10/20/2016.
 */
@Component
public class HelloTopicListener 

    @JmsListener(destination = "mailbox.topic", containerFactory = "topicListenerFactory")
    public void receiveTopicMessage(Email email) 
        System.out.println("Received <" + email + ">");
    

    @JmsListener(destination = "mailbox.queue", containerFactory = "queueListenerFactory")
    public void receiveQueueMessage(Email email) 
        System.out.println("Received <" + email + ">");
    

完成后,您就可以订阅所选主题了。

当然,这有多种方法,您可以为不同的 jmsTemplates 制作一个 bean 的映射,每个都可以在您需要时根据队列或主题使用它们。模板和 bean 可以用你喜欢的方法来实例化,这在SO Question 中讨论过。希望对你有帮助

【讨论】:

当我尝试使用 JmsListenerContainerFactory bean 或在 application.properties 中设置此属性 spring.jms.pub-sub-domain=true 时,所有我的队列发送者和侦听器的其他代码开始表现得好像他们正在读写主题而不是队列。是否有一种解决方案可以使队列和主题的侦听器共存。 您始终可以选择拥有多个工厂。在代码的情况下,您可以选择同时拥有队列和主题。问题是当您选择接收时,您将不得不选择工厂。在这种情况下,您可以拥有 2 个工厂,从主题中获取以及从队列中获取。也编辑了我的答案 我尝试使用多个工厂,即使这样创建的主题只是表现得像一个队列,即在创建多个侦听器时消息在它们之间分布。相反,每个订阅者都应该得到所有的消息 上面的例子不起作用,这就是为什么你说它仍然像一个队列而不是一个主题。原因是因为“factory.setPubSubDomain(true);”必须在“configurer.configure(factory, connectionFactory);”之后完成。【参考方案3】:

在 Spring Boot 的 Application.properties 中,尝试设置以下属性:

spring.jms.pub-sub-domain=true

然后,将此属性用于您用于收听主题的容器工厂。

【讨论】:

实际上 Boot 自动配置的默认容器工厂会自动使用该标志。 我怀疑使用这个属性会让所有可用的听众都在听主题。我正在寻找一种方法,如何在同一项目本身中为队列和主题设置单独的侦听器。 您不能使用相同的 connectionFactory 来监听队列和主题。您的侦听器将派生自 connectionFactory,它将被配置为侦听点对点(队列)或发布-订阅(主题)。因此,有两个 connectionFactories,一个用于 Topic 和 Queue,然后根据需要使用它们。 pub-sub-domain 的默认设置设置为 false,这意味着默认情况下它将侦听 Queue。 设置该属性会覆盖所有工厂创建的 pubsubdomain 标志,因此所有侦听器都表现为队列或主题,具体取决于标志集的值

以上是关于如何使用spring boot jms收听话题的主要内容,如果未能解决你的问题,请参考以下文章

如何访问@JmsListener使用的Spring Boot中的活动JMS连接/会话

如何在 Spring Boot 中收听流式 API?

当两个应用程序都使用嵌入式activemq时,如何将Jms消息从一个spring-boot应用程序发送到另一个应用程序

如何使用 spring 集成 dsl 从 JMS 队列中解组 XML

带有 tibco jms 监听器的 Spring Boot

JMS消息监听执行失败,没有设置ErrorHandler