Activemq - 超过允许的最大客户端连接数

Posted

技术标签:

【中文标题】Activemq - 超过允许的最大客户端连接数【英文标题】:Activemq - Exceeded the maximum number of allowed client connections 【发布时间】:2014-09-23 05:15:30 【问题描述】:

我的 acitvemq 服务器总是在下面打印错误:

2014-07-12 16:14:27,820 | ERROR | Could not accept connection : 
org.apache.activemq.transport.tcp.ExceededMaximumConnectionsException:
Exceeded the maximum number of allowed client connections.
See the 'maximumConnections' property on the TCP transport configuration URI 
in the ActiveMQ configuration file (e.g., activemq.xml) 
| org.apache.activemq.broker.TransportConnector 
| ActiveMQ Transport Server Thread Handler:
 tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600

当我重新启动服务器时,它会好的。但是几天后错误再次出现。 我不明白为什么连接数总是增加到 1000。

我的服务器配置:

<!-- activeMQ -->
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="$jms.brokerURL"></property>
</bean>

<!-- Spring Caching  -->
<bean id="cachingConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="jmsConnectionFactory" />
    <property name="sessionCacheSize" value="10" />
</bean>

<!-- Spring JMS Template -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="cachingConnectionFactory" />
    <property name="explicitQosEnabled" value="true" />
    <property name="priority" value="4" />
</bean>

<bean id="scoreQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="SCORE" />
</bean>

<bean id="scoreMessage" class="com.tt.score.mq.server.ScoreMessage"></bean>

<bean id="scoreListener"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsConnectionFactory"></property>
    <property name="destination" ref="scoreQueue"></property>
    <property name="messageListener" ref="scoreMessage"></property>
    <property name="concurrentConsumers" value="10" />
    <property name="maxConcurrentConsumers" value="100" />
    <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />
</bean>

我的客户端配置 xml:

<!-- Spring Caching -->
<bean id="cachingConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="jmsConnectionFactory" />
    <property name="sessionCacheSize" value="10" />
</bean>

<!-- Spring JMS Template -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="cachingConnectionFactory" />
    <property name="explicitQosEnabled" value="true" />
    <property name="priority" value="4" />
</bean>

<bean id="messageProducer" class="com.tt.score.mq.client.MessageProducer">
    <property name="jmsTemplate" ref="jmsTemplate" />
    <property name="scoreQueue" ref="scoreQueue" />
</bean>

<bean id="scoreQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="SCORE" />
</bean>

其他信息:

acitvemq 服务器:5.8.0 客户端 acitvemq : 5.4.2 春天:3.0.7 spring-jms : 3.0.7

我们使用 transactionManager,因此 DefaultMessageListenerContainer 的缓存级别将设置为 none。

---update add dao config-----------------------------------

<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName"><value>$jdbc.driverClass</value></property>
        <property name="username"><value>$jdbc.user</value></property>
        <property name="url"><value>$jdbc.jdbcUrl</value></property>
        <property name="password">
            <bean class="com.tongbanjie.commons.util.EncryptDBPasswordFactory">
                <property name="password" value="$jdbc.password" />
            </bean>
        </property>
        <property name="maxActive"><value>$jdbc.maxActive</value></property>
        <property name="initialSize"><value>$jdbc.initialSize</value></property>
        <property name="maxWait"><value>60000</value></property>
        <property name="maxIdle"><value>$jdbc.maxIdle</value></property>
        <property name="minIdle"><value>5</value></property>
        <property name="removeAbandoned"><value>true</value></property>
        <property name="removeAbandonedTimeout"><value>180</value></property>
        <property name="timeBetweenEvictionRunsMillis"><value>60000</value></property>
        <property name="minEvictableIdleTimeMillis"><value>1800000</value></property>
        <property name="defaultAutoCommit" value="false" />
        <property name="connectionProperties">
            <value>bigStringTryClob=true;clientEncoding=UTF-8;defaultRowPrefetch=50;serverEncoding=ISO-8859-1</value>
        </property>
    </bean>

    <bean class="org.springframework.aop.framework.autoproxy.DefaultAdvisorAutoProxyCreator" />

    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource" />
    </bean>

    <tx:annotation-driven transaction-manager="transactionManager" />


    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource">
            <ref bean="dataSource"/>
        </property>
    </bean>

    <!-- myBatis -->
    <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="configLocation" value="classpath:META-INF/mybatis/score-configuration.xml" />
        <property name="mapperLocations" value="classpath*:META-INF/mybatis/mapper/*.xml" />
        <property name="dataSource" ref="dataSource" />
    </bean>

    <bean id="commonSqlSessionDao" abstract="true">
        <property name="sqlSessionFactory">
            <ref bean="sqlSessionFactory" />
        </property>
    </bean>

-----贴出我们现在如何使用模板的代码

封装在一个类中的 jmsTemplate

public class MessageProducer 

    private JmsTemplate   jmsTemplate;
    private ActiveMQQueue scoreQueue;

    public void sendScoreQueue(Map<String, String> userMap) 
        sendMessage(this.scoreQueue, userMap);
    

    private void sendMessage(Destination destination, final Map<String, String> map) 
        this.jmsTemplate.send(destination, new MessageCreator() 

            public Message createMessage(Session session) throws JMSException 
                MapMessage message = session.createMapMessage();
                for (String key : map.keySet()) 
                    message.setStringProperty(key, (String) map.get(key));
                
                return message;
            
        );
    
}

我们使用一个thead 来调用MessageProducer 类的sendScoreQueue 方法。 如下:

 //the code is old and ugly.that is the original position we call the mq.
 ThreadUtils.execute(new Thread(new SendMsgThread(dycc, ScoreMQSendType.SEND_TYPE_SCORE)));

///

public class ThreadUtils 

    protected static ThreadPoolExecutor executor = null;
    public static Properties            Props    = null;

    public static void execute(Thread thread) 
        executor.execute(thread);
    

  static 
        if (executor == null) 
            Integer corePoolSize = 5;
            Integer maximumPoolSize = 10;
            Integer keepAliveTime = 3000;
            executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES,
                                              new LinkedBlockingQueue());
    


public class SendMsgThread implements Runnable 

    private Log                    log      = LogFactory.getLog(SendMsgThread.class);

    private Map<String, String>    map;
    private String                 type;

    private static MessageProducer producer = null;

    public SendMsgThread(Map<String, String> map, String type)
        this.type = type;
        this.map = map;
    

    public void run() 
        try 
            if(type.equals(ScoreMQSendType.SEND_TYPE_SCORE) || type.equals(ScoreMQSendType.SEND_TYPE_REGISTER)) 
                producer.sendMessage(map);
            
         catch (Exception e) 
            this.log.error("sendMsgThread sendScoreQueue error.", e);
        
    

 static 
        if (producer == null) producer = 
(MessageProducer )SpringContextHolder.getBean(MessageProducer .class);
    

【问题讨论】:

您这样说:“我们使用 transactionManager,因此 DefaultMessageListenerContainer 的缓存级别将设置为无”。您的 tx 管理器不是 JmsTransactionManager 并且 DMLC 没有设置其属性 transactionManager。 您是否在应用程序的其他地方使用jmsConnectionFactory?还有,jmsTemplate怎么用,能发一些代码吗? @AndreiStefan 我发布了我们如何使用 jmsTempate 的代码。而且我们只在mq中使用jmsConnectionFactory。 SendMsgThreadMessageSender的源代码是什么,因为我看到的和MessageProducer不一样? @AndreiStefan 抱歉,svn 中继代码使用了来自 SendMsgThread 的 MessageProducer。 MessageSender 是一个新类。我们现在只是使用它来测试。 【参考方案1】:

我有同样的问题重复出现同样的错误,并且通过反复试验获得了解决方案,使用 jms 调用并发消费者 max 到 101,而不是 100,并查看结果是否重复,添加更多值导致代码在调试器上进一步执行,当代码工作时您将达到一个值,而且解决方案似乎正在使用连接池工厂。

试试这个,希望它有效,其余我的实现与你在 bean 文件中的实现相同

【讨论】:

【参考方案2】:

对于这种情况,您应该使用 PooledConnectionFactory 而不是缓存连接工厂。 更多信息可以找到here。它们之间的区别可以找到here

【讨论】:

以上是关于Activemq - 超过允许的最大客户端连接数的主要内容,如果未能解决你的问题,请参考以下文章

activemq 5.10.0 支持的最大 mqtt 连接数是多少

数据库访问超过最大连接数问题

activemq连接过多导致变慢

ActiveMQ——ActiveMQ的Transport

ActiveMQ支持的传输协议

ActiveMQ(08):ActiveMQ支持的传输协议与配置