ActiveMQ:使用队列(具有并发消费者)和主题的正确配置

Posted

技术标签:

【中文标题】ActiveMQ:使用队列(具有并发消费者)和主题的正确配置【英文标题】:ActiveMQ: Correct config with both Queues (with concurrent consumers) and Topics 【发布时间】:2012-04-10 12:25:16 【问题描述】:

我们有一个 ActiveMQ / Camel 配置,以前一直使用专门的消息队列,具有并发消费者。

但是,我们现在引入消息主题,并发现 - 由于并发消费者 - 在主题中接收到的消息被多次消费。

这个场景的正确配置是什么?

即,我们希望队列上接收的消息有多个并发消费者,但主题上接收的消息只定义一个消费者。

这是当前配置:

<amq:connectionFactory id="amqConnectionFactory"
    useAsyncSend="true" brokerURL="$$ptl.Servername.jms.cluster.uri"
    userName="$jms.username" password="$jms.password" sendTimeout="1000"
    optimizeAcknowledge="true" disableTimeStampsByDefault="true">
</amq:connectionFactory>

<bean id="cachingConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
    <property name="cacheConsumers" value="true"></property>
    <property name="cacheProducers" value="true"></property>
    <property name="reconnectOnException" value="true"></property>
    <property name="sessionCacheSize" value="$jms.sessioncachesize"></property>
</bean>

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="cachingConnectionFactory" />
    <property name="transacted" value="false" />
    <property name="concurrentConsumers" value="$jms.concurrentConsumer" />
    <property name="maxConcurrentConsumers" value="$jms.max.concurrentConsumer" />
    <property name="preserveMessageQos" value="true" />
    <property name="timeToLive" value="$jms.timeToLive" />
</bean>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig" />
</bean>

【问题讨论】:

【参考方案1】:

您可以将任何主题消费者的 concurrentConsumers/maxConcurrentConsumers 显式设置为“1”。

from("activemq:topic:myTopic?concurrentConsumers=1&maxConcurrentConsumers=1")...  

或者,将 JmsConfiguration concurrent/maxConcurrentConsumers 属性设置为“1”,然后根据需要显式启用队列的并发消费。

from("activemq:queue:myQueue?maxConcurrentConsumers=5")...  

另外,您可以使用Virtual Topics 来执行主题消息的并发消费,而不会出现重复(强烈推荐超过传统主题)

【讨论】:

【参考方案2】:

我最终使用的解决方案是创建一个单独的 jmsConfig/activeMQ 配置块。

总体配置如下:

 <!-- This is appropriate for consuming Queues, but not topics.  For topics, use
jmsTopicConfig / activemqTopics -->
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="cachingConnectionFactory" />
    <property name="transacted" value="false" />
    <property name="concurrentConsumers" value="$jms.concurrentConsumer" />
    <property name="maxConcurrentConsumers" value="$jms.max.concurrentConsumer" />
    <property name="preserveMessageQos" value="true" />
    <property name="timeToLive" value="$jms.timeToLive" />
</bean>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig" />
</bean>

<!-- This config limits to a single concurrent consumer.  This config is appropriate for
consuming Topics, not Queues. -->
<bean id="jmsTopicConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="cachingConnectionFactory" />
    <property name="transacted" value="false" />
    <property name="concurrentConsumers" value="1" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="preserveMessageQos" value="true" />
    <property name="timeToLive" value="$jms.timeToLive" />
</bean>

<bean id="activemqTopics" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsTopicConfig" />
</bean>

然后,在骆驼管道中,从activemqTopics bean 中消费主题,如下所示:

<camel:route id="myTopicResponder">
    <camel:from uri="activemqTopics:topic:stockQuotes?concurrentConsumers=1" />
    <camel:to uri="bean:stockQuoteResponder?method=saveStockQuote"/>
</camel:route>

【讨论】:

以上是关于ActiveMQ:使用队列(具有并发消费者)和主题的正确配置的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ使用

ActiveMQ之队列和主题发布订阅实例

ActiveMQ队列主题模式区别

Spring Integration JMS 创建 ActiveMQ 队列而不是主题

ActiveMQ的两种消息模式,主题队列

2020-07-28 activeMq 两种模式的测试