添加动态数量的监听器(Spring JMS)

Posted

技术标签:

【中文标题】添加动态数量的监听器(Spring JMS)【英文标题】:Adding Dynamic Number of Listeners(Spring JMS) 【发布时间】:2016-03-07 21:41:21 【问题描述】:

我需要添加多个侦听器,如application.properties 文件中所述。如下图,

InTopics=Sample.QUT4,Sample.T05,Sample.T01,Sample.JT7

注意:这个数字可能更多也可能更少。

我正在考虑将它们放入一个数组中,

@Value("$InTopics")
private String[] inTopics;

但我不知道如何从数组中创建多个侦听器。

目前,对于我正在做的一个主题,

@Configuration
@EnableJms
public class JmsConfiguration 

@Value("$BrokerURL")
private String brokerURL;

@Value("$BrokerUserName")
private String brokerUserName;

@Value("$BrokerPassword")
private String brokerPassword;

@Bean
TopicConnectionFactory connectionFactory() throws JMSException 
    TopicConnectionFactory connectionFactory = new TopicConnectionFactory(brokerURL, brokerUserName, brokerPassword);
    return connectionFactory;


@Bean
JmsListenerContainerFactory<?> jmsContainerFactory(TopicConnectionFactory connectionFactory) throws JMSException 
    SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setPubSubDomain(Boolean.TRUE);
    return factory;
 


还有我的听众,

@JmsListener(destination = "$SingleTopicName", containerFactory = "jmsContainerFactory")
public void receiveMessage(Message msg) 
   //Do Some Stuff

有什么方法可以实现吗?

【问题讨论】:

【参考方案1】:

您不能使用带注释的 @JmsListeners 来做到这一点,但您可以通过编程方式注册每个侦听器 by extending JmsListenerConfigurer as described in the reference documentation。

编辑

由于您将属性作为数组注入...

@Value("$InTopics")
private String[] inTopics;

Spring 将拆分列表并根据列表中的队列数创建一个数组。

然后您可以遍历 JmsListenerConfigurer.configureJmsListeners() 中的数组并为数组中的每个元素创建一个端点 - 您无需提前知道数组有多大。

for (String inTopic : inTopics) 
    ...

【讨论】:

谢谢。是的,如果有固定数量的听众,我可以做到。但是当监听器大小的数量不固定时怎么办呢?可以是 5、4、8 等(取决于提到的数量,用, 分隔)。任何提示或技巧? :) 我编辑了答案来解释你不需要提前知道数组有多少元素。【参考方案2】:

这是动态定义监听器数量的自定义代码。

JmsConfiguration jmsConfiguration;

private List<String> queueList;

@Bean
public DefaultJmsListenerContainerFactory mqJmsListenerContainerFactory() throws JMSException 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(jmsConfiguration.jmsConnectionFactory());
    factory.setDestinationResolver(new DynamicDestinationResolver());
    factory.setSessionTransacted(true);
    factory.setConcurrency("5");
    return factory;


@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) 

    queueList.forEach(queue -> 
        SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
        endpoint.setId(queue);
        endpoint.setDestination(queue);
        try 
            endpoint.setMessageListener(message -> 
                try 
                    logger.info("Receieved ID:  Destination ", message.getJMSMessageID(), message.getJMSDestination());
                
                catch (JMSException e) 
                    logger.info("Exception while reading message - " + e);
                
            );
            registrar.setContainerFactory(mqJmsListenerContainerFactory());
        
        catch (JMSException e) 
            logger.info("Exception - " + e);
        
        registrar.registerEndpoint(endpoint);
    );


【讨论】:

【参考方案3】:

我不知道这是存在的,我不得不手动写下所有这些代码。因此,另一种方法是在您的 bean 中实现 BeanFactoryPostProcessor 并手动添加 jms 侦听器的所有必需组件。

    jndiTemplate jndiQueueConnectionFactory(取决于步骤 1 中的 jndiTemplate) queueConnectionFactory(取决于步骤 2 中的 jndiQueueConnectionFactory) jndiDestinationResolver(使用来自 stem 1 的 jndiTemplate) messageListenerContiner(使用以上所有创建的项目)

所以,如您所见,我不仅将 jms 监听器相乘,而且还动态地生成多个监听器容器。 Ofc,这是我的要求。并且可能会因要求而异。

要记住的一件事是,在您操作BeanFactoryPostProcessor 时没有资源(如加载的属性等)。您必须手动加载属性。我通过 afterPropertiesSet 方法来自 InitializingBean

【讨论】:

以上是关于添加动态数量的监听器(Spring JMS)的主要内容,如果未能解决你的问题,请参考以下文章

Spring整合JMS——消息监听器

使用 Spring 启动和停止 JMS 侦听器

带有 tibco jms 监听器的 Spring Boot

JMS消息监听执行失败,没有设置ErrorHandler

Spring JMS 监听器中的事务管理

Spring + JMS + ActiveMQ实现简单的消息队列(监听器异步实现)