ActiveMQ:使用队列(具有并发消费者)和主题的正确配置
Posted
技术标签:
【中文标题】ActiveMQ:使用队列(具有并发消费者)和主题的正确配置【英文标题】:ActiveMQ: Correct config with both Queues (with concurrent consumers) and Topics 【发布时间】:2012-04-10 12:25:16 【问题描述】:我们有一个 ActiveMQ / Camel 配置,以前一直使用专门的消息队列,具有并发消费者。
但是,我们现在引入消息主题,并发现 - 由于并发消费者 - 在主题中接收到的消息被多次消费。
这个场景的正确配置是什么?
即,我们希望队列上接收的消息有多个并发消费者,但主题上接收的消息只定义一个消费者。
这是当前配置:
<amq:connectionFactory id="amqConnectionFactory"
useAsyncSend="true" brokerURL="$$ptl.Servername.jms.cluster.uri"
userName="$jms.username" password="$jms.password" sendTimeout="1000"
optimizeAcknowledge="true" disableTimeStampsByDefault="true">
</amq:connectionFactory>
<bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="cacheConsumers" value="true"></property>
<property name="cacheProducers" value="true"></property>
<property name="reconnectOnException" value="true"></property>
<property name="sessionCacheSize" value="$jms.sessioncachesize"></property>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="cachingConnectionFactory" />
<property name="transacted" value="false" />
<property name="concurrentConsumers" value="$jms.concurrentConsumer" />
<property name="maxConcurrentConsumers" value="$jms.max.concurrentConsumer" />
<property name="preserveMessageQos" value="true" />
<property name="timeToLive" value="$jms.timeToLive" />
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
【问题讨论】:
【参考方案1】:您可以将任何主题消费者的 concurrentConsumers/maxConcurrentConsumers 显式设置为“1”。
from("activemq:topic:myTopic?concurrentConsumers=1&maxConcurrentConsumers=1")...
或者,将 JmsConfiguration concurrent/maxConcurrentConsumers 属性设置为“1”,然后根据需要显式启用队列的并发消费。
from("activemq:queue:myQueue?maxConcurrentConsumers=5")...
另外,您可以使用Virtual Topics 来执行主题消息的并发消费,而不会出现重复(强烈推荐超过传统主题)
【讨论】:
【参考方案2】:我最终使用的解决方案是创建一个单独的 jmsConfig/activeMQ 配置块。
总体配置如下:
<!-- This is appropriate for consuming Queues, but not topics. For topics, use
jmsTopicConfig / activemqTopics -->
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="cachingConnectionFactory" />
<property name="transacted" value="false" />
<property name="concurrentConsumers" value="$jms.concurrentConsumer" />
<property name="maxConcurrentConsumers" value="$jms.max.concurrentConsumer" />
<property name="preserveMessageQos" value="true" />
<property name="timeToLive" value="$jms.timeToLive" />
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
<!-- This config limits to a single concurrent consumer. This config is appropriate for
consuming Topics, not Queues. -->
<bean id="jmsTopicConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="cachingConnectionFactory" />
<property name="transacted" value="false" />
<property name="concurrentConsumers" value="1" />
<property name="maxConcurrentConsumers" value="1" />
<property name="preserveMessageQos" value="true" />
<property name="timeToLive" value="$jms.timeToLive" />
</bean>
<bean id="activemqTopics" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsTopicConfig" />
</bean>
然后,在骆驼管道中,从activemqTopics
bean 中消费主题,如下所示:
<camel:route id="myTopicResponder">
<camel:from uri="activemqTopics:topic:stockQuotes?concurrentConsumers=1" />
<camel:to uri="bean:stockQuoteResponder?method=saveStockQuote"/>
</camel:route>
【讨论】:
以上是关于ActiveMQ:使用队列(具有并发消费者)和主题的正确配置的主要内容,如果未能解决你的问题,请参考以下文章