spring boot activemq消费者连接池
Posted
技术标签:
【中文标题】spring boot activemq消费者连接池【英文标题】:spring boot activemq consumer connection pool 【发布时间】:2018-05-22 22:38:37 【问题描述】:Spring Boot ActiveMQ消费者连接池需要配置吗?我在 Spring Boot 应用程序中只有一个消费者(作为微服务),生产者在另一个应用程序中。我对以下内容有点困惑:(摘自http://activemq.apache.org/spring-support.html) 注意:虽然 PooledConnectionFactory 确实允许创建活动消费者的集合,但它不能“汇集”消费者。池化对于连接、会话和生产者来说是有意义的,它们可能是很少使用的资源,创建起来很昂贵,并且可以以最低的成本保持空闲状态。另一方面,消费者通常只是在启动时创建并离开,处理传入的消息。当消费者完成后,最好将其关闭而不是让它闲置并将其返回到池中以供以后重用:这是因为,即使消费者空闲,ActiveMQ 也会继续将消息传递到消费者的预取缓冲区,在消费者再次活跃之前,它们将被阻止。 在同一页面上,我可以看到:您可以使用 activemq-pool org.apache.activemq.pool.PooledConnectionFactory 为您的消费者集合有效地汇集连接和会话,或者您可以使用Spring JMS org.springframework.jms.connection.CachingConnectionFactory 实现同样的效果 我尝试了 CachingConnectionFactory(它可以采用 ActiveMQConnectionFactory),它只有几个设置器来保存 cacheConsumers(布尔)、cacheProducers(布尔),与池连接无关。我知道 1 个连接可以为您提供多个会话,然后每个会话您有多个消费者/生产者。但我的问题是针对消费者,我们如何合并,因为上面的声明是让它保持默认。所以我只通过一种方法做到了这一点:@Bean public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer 配置器)
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory, including the message converter
factory.setConcurrency("3-10");
configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
return factory;
</em><br>
Dynamic scaling这个链接也暗示了这一点,但我找不到具体的解决方案。 有没有人遇到过这种情况,请给点建议。感谢您阅读这篇文章,任何帮助都非常感谢。 生产的其他详细信息:
此消费者每秒将收到约 500 条消息。 使用 Spring Boot 1.5.8.RELEASE 版本, ActiveMQ 5.5 是我的 JMS【问题讨论】:
您找到解决方案了吗?我也对上述陈述感到困惑 【参考方案1】:在activemq中有一个名为org.apache.activemq.jms.pool的包,它提供了PooledConsumer。下面是代码。请检查它是否适合您。我知道这不是 spring 方式,但您可以轻松地自定义您的 poll 方法。
PooledConnectionFactory pooledConFactory = null;
PooledConnection pooledConnection = null;
PooledSession pooledSession = null;
PooledMessageConsumer pooledConsumer = null;
Message message = null;
try
// Get the connection object from PooledConnectionFactory
pooledConFactory = ( PooledConnectionFactory ) this.jmsTemplateMap.getConnectionFactory();
pooledConnection = ( PooledConnection ) pooledConFactory.createConnection();
pooledConnection.start();
// Create the PooledSession from pooledConnection object
pooledSession = ( PooledSession ) pooledConnection.createSession( false, 1 );
// Create the PooledMessageConsumer from session with given ack mode and destination
pooledConsumer = ( PooledMessageConsumer ) pooledSession.
createConsumer( this.jmsTemplateMap.getDefaultDestination(), <messageFilter if any>);
while ( true )
message = pooledConsumer.receiveNoWait();
if ( message != null)
break;
catch ( JMSException ex )
LOGGER.error("JMS Exception occured, closing the session", ex );
return message;
【讨论】:
以上是关于spring boot activemq消费者连接池的主要内容,如果未能解决你的问题,请参考以下文章