JMS队列中没有负载时消费者数量不减少

Posted

技术标签:

【中文标题】JMS队列中没有负载时消费者数量不减少【英文标题】:Number of consumer not shrinking when there is not load in JMS queue 【发布时间】:2020-10-15 20:29:31 【问题描述】:

我正在编写一个 JMS 消费者,使用 spring-jms 来消费来自 ActiveMQ Artemis 代理的消息。我正在调查 3 个属性的行为:concurrentConsumersmaxConcurrentConsumersidleConsumerLimit

预期行为是当队列中的负载增加时,消费者应该扩大到maxConcurrentConsumers,而当负载减少时,消费者应该缩小到idleConsumerLimit

但是spring-jms 和 ActiveMQ Artemis 不会发生这种情况。当队列中没有消息时,消费者计数不会下降,而是保留所有maxConcurrentConsumers

我正在使用 spring DMLC 容器

    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destinationName" value="TestingInQueue" />
        <property name="messageListener" ref="messageListener" />
        <property name="concurrentConsumers" value="1"/>
        <property name="maxConcurrentConsumers" value="20"/>
        <property name="idleConsumerLimit" value="5"/>
    </bean>

完整的上下文 XML:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
        <property name="environment">
            <props>
                <prop key="java.naming.factory.initial">org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory</prop>
                <prop key="java.naming.provider.url">tcp://localhost:61616</prop>
                <prop key="java.naming.security.principal">admin</prop>
                <prop key="java.naming.security.credentials">admin</prop>
            </props>
        </property>
    </bean>

    <bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
        <property name="jndiTemplate" ref="jndiTemplate"/>
        <property name="jndiName" value="ConnectionFactory"/>
    </bean>

    <bean id="destinationQueue" class="org.apache.activemq.artemis.jms.client.ActiveMQQueue">
        <constructor-arg index="0" value="TestingInQueue" />
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="defaultDestination" ref="destinationQueue" />
    </bean>

    <bean id="messageListener" class="com.practise.SampleListener">
        <property name="jmsTemplate" ref="jmsTemplate" />
        <property name="queue" ref="destinationQueue" />
    </bean>

    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destinationName" value="TestingInQueue" />
        <property name="messageListener" ref="messageListener" />
        <property name="concurrentConsumers" value="1"/>
        <property name="maxConcurrentConsumers" value="20"/>
        <property name="idleConsumerLimit" value="5"/>
    </bean>
</beans>

【问题讨论】:

【参考方案1】:

注意:

maxConcurrentConsumers:负载下的最大消费者 concurrentConsumers :如果你闲置一段时间,你最终会到达这里。 (到达这里所需的时间取决于接下来的 3 个参数)

以下 3 项是为了避免消费者在 maxConcurrentConsumersconcurrentConsumers 之间无任何延迟地大幅上下缩放。

idleConsumerLimit maxMessagesPerTask idleTaskExecutionLimit

示例

如果您从重负载开始,您将拥有 20 个 (maxConcurrentConsumers) 消费者。然后,如果您使队列为空,它将缩小到 5 (idleConsumerLimit)。然后它将等待达到idleTaskExecutionLimit 尝试,然后缩小到 1 (concurrentConsumers)。 idleTaskExecutionLimit 是导致空提取的提取尝试次数,其中每次提取需要 receiveTimeout(默认 1000 毫秒)时间单位。

问题:

maxMessagesPerTask 的默认值为-1,如果maxMessagesPerTask 不大于零,idleConsumerLimit 将不生效

解决方案:

指定idleConsumerLimit 时,也要指定maxMessagesPerTask

在此处为 maxMessagesPerTask 指定 10 到 100 条消息以平衡相当长寿命和相当短寿命的任务

参考

https://access.redhat.com/solutions/1412643 DefaultMessageListenerContainer

【讨论】:

感谢您的回答。设置 maxMessagesPerTask 值有效。设置此值后,消费者计数下降到 concurrentConsumers 计数,而不是 idleConsumerLimit。这个配置中idleConsumerLimit的作用是什么? 即使maxMessagesPerTaskidleConsumerLimit 未生效,但消费者数量能够缩减至concurrentConsumers 数量。 我将 maxMessagesPerTask 设置为 10 进行测试,然后消费者计数降至 1,即 concurrentConsumer 计数。如果我用空队列启动应用程序,则创建 1 个消费者。我希望在缩减消费者数量时必须为 5,即 idleConsumerLimit 是的,它被忽略了 idleTaskExecutionLimit 属性在一段时间后消费者逐渐缩小到1 后使队列为空。我的问题是从idleTaskExecutionLimit idleConsumerLimit(5) 将等待多长时间?你能帮忙吗

以上是关于JMS队列中没有负载时消费者数量不减少的主要内容,如果未能解决你的问题,请参考以下文章

发送到JMS队列的消息将仅由单个消费者使用?

Camel 和 JMS 以正确的顺序从高级队列中消费消息

一文详细解析kafka重平衡机制

怒肝15天终于将Kafka的重平衡一举拿下

RocketMQ消费者消息队列负载均衡

后端 activemq队列模式下的多个消费者问题