DefaultMessageListenerContainer 不缩放

Posted

技术标签:

【中文标题】DefaultMessageListenerContainer 不缩放【英文标题】:DefaultMessageListenerContainer not scaling 【发布时间】:2012-09-16 23:35:29 【问题描述】:

我有一个 DefaultMessageListenerContainer,它(在我看来)没有扩大。 Container 被定义为侦听一个队列,其中有 100 条消息。

我希望 Container 可以达到任何长度,消息将尽可能快地被消耗(通过观察 maxConcurrentConsumers 配置)。所以我假设有 7 个并发消费者。 (从容器启动时的 2 个并发消费者开始)一些日志信息:

activeConsumerCount: 5
concurrentConsumers: 2
scheduledConsumerCount: 5
idleConsumerLimit: 1
idleTaskExecLimit: 1
maxConcurrentConsumers: 7

我的 Spring-config(其中的一部分):

<bean id="abstractMessageListenerContainer" class="my.package.structure.LoggingListenerContainer" abstract="true">
    <property name="connectionFactory" ref="jmscfCee" />
    <property name="maxConcurrentConsumers" value="7"/>
    <property name="receiveTimeout" value="100000" />
    <property name="concurrentConsumers" value="2" />
</bean>

<bean class="my.package.structure.LoggingListenerContainer" parent="abstractMessageListenerContainer">
    <property name="destinationName" value="MY.QUEUE" />
    <property name="messageListener" ref="myMessageListener" />
</bean>

<bean id="myMessageListener" class="my.package.structure.ListenerClass"></bean>

我的日志容器

public class LoggingListenerContainer extends DefaultMessageListenerContainer

private static final Logger logger = Logger
        .getLogger(LoggingListenerContainer.class);
@Override
protected void doInvokeListener(MessageListener listener, Message message)
        throws JMSException 

    logger.info("activeConsumerCount: " + this.getActiveConsumerCount());
    logger.info("concurrentConsumers: " +  this.getConcurrentConsumers());
    logger.info("scheduledConsumerCount: " + this.getScheduledConsumerCount());
    logger.info("idleConsumerLimit: " + this.getIdleConsumerLimit());
    logger.info("idleTaskExecLimit: " + this.getIdleTaskExecutionLimit());
    logger.info("maxConcurrentConsumers: " + this.getMaxConcurrentConsumers());
    super.doInvokeListener(listener, message);

我的听众班:

public class ListenerClass implements MessageListener 


    public void onMessage(Message msg) 
           //Do some business function
    


有人可以纠正我的配置或给我一些关于我的配置的提示或解释容器的方法吗? (如果我误解了什么)

我正在使用 ActiveMQ 进行本地测试(使用 WebSphere MQ 进行生产)——如果它与可伸缩性主题相关。

编辑:

<bean id="jmscfCee" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL">
            <value>$jmscfCee.hostName</value>
        </property>
</bean>

<bean id="jmscfCeeCachingConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory ">
    <constructor-arg ref="jmscfCee" />
    <property name="sessionCacheSize" value="10" />
</bean>

【问题讨论】:

您使用的是哪个版本的 spring-jms?您是否尝试在容器中设置自定义 taskExecutor?默认情况下,DefaultMessageListenerContainer 似乎使用 SimpleAsyncTaskExecutor,它应该为每个任务生成新线程(从 3.1.2 开始),但我想知道旧版本是否会做一些不同的事情。另外,当队列仍然包含许多消息时,您是否正在检查此日志输出?如果idleTaskExecutionLimit 较低,则容器可能会在不再需要时终止它产生的线程 - 在此处尝试更高的值。 我的 spring-jms 版本是 3.1.1.RELEASE。试图设置一个自定义任务执行器 - 没有任何影响。如果队列上仍有大约 2k 或 3k 条消息,则行为也是相同的。将 idleTaskExecutionLimit 设置为 10 也没有改变任何东西。 【参考方案1】:

这取决于。几年前我在 ActiveMQ 上遇到过类似的问题,它的默认行为是针对大量(数千条)小消息而优化的。默认情况下,每个消费者将分批预取 1000 条消息,因此,如果您有少量消息,您可能会发现它们都最终在一个消费者的预取缓冲区中,而其他消费者则处于空闲状态。

您可以在连接 URI 或 Spring 配置中使用 prefetch policy 调整此行为(如果这是您构建连接工厂的方式)。

<amq:connectionFactory id="connectionFactory" brokerURL="vm://localhost">
  <property name="prefetchPolicy">
    <amq:prefetchPolicy all="1" />
  </property>
</amq:connectionFactory>

我当时使用的 ActiveMQ 版本不支持 0 的预取限制(即不预取,每次都去代理)但文档表明现在允许这样做。

【讨论】:

如果 OP 使用 Spring 的 DefaultMessageListenerContainer,那么它使用纯 JMS API 并且不使用任何特定于 ActiveMQ 的任何东西 - 对吗?我不认为这个建议会适用。 没错,但最终您需要在某处配置队列名称和会话工厂,这将是您可以添加特定于代理的扩展的地方。对于 ActiveMQ,您可以使用队列名称(如 MY.QUEUE?consumer.prefetchSize=1)设置每个目标的预取策略 prefetchSize 没有改变任何行为。将其添加到我的 brokerurl 并仔细检查了我的 connectionfactory 中的属性。也发布我的connectionfactory-configuration(在我的编辑中),也许这是瓶颈......

以上是关于DefaultMessageListenerContainer 不缩放的主要内容,如果未能解决你的问题,请参考以下文章