ActiveMQ Failover transport + Spring JmsTemplate 不断重连

Posted

技术标签:

【中文标题】ActiveMQ Failover transport + Spring JmsTemplate 不断重连【英文标题】:ActiveMQ Failover transport + Spring JmsTemplate constant reconnection 【发布时间】:2015-04-17 07:51:11 【问题描述】:

我遇到了 ActiveMQ 故障转移传输问题。我正在使用 Spring (3.0.5) 和 ActiveMQ (5.2.0)。我想使用 ActiveMQ 主题来广播一些消息。我的配置如下:

jms-context.xml

我正在创建简单的ActiveMQConnectionFactory,它使用异步发送并用PooledConnetionFactory 包装它,以便以后JMSTemplate 可以重用池连接。

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xsi:schemaLocation="
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
            http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd
">


    <!-- JMS -->

    <amq:connectionFactory id="jmsTopicAsyncConnectionFactory" brokerURL="$jms.url">
        <property name="useAsyncSend" value="true"/>
    </amq:connectionFactory>

    <bean id="pooledJmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <constructor-arg ref="jmsTopicAsyncConnectionFactory" />
    </bean>

</beans>

JMSConfig.java

在 java 配置中,我使用池连接工厂定义 JmsTemplate 实例。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportResource;
import org.springframework.context.annotation.Scope;
import org.springframework.jms.core.JmsTemplate;

@Configuration
@ImportResource("classpath:jms-context.xml")
public class JmsConfig 

    /**
     * Used for sending messages to broadcast topics.
     * 
     * @param pooledJmsConnectionFactory JMS connection factory.
     * @return JMS template instance.
     */
    @Bean
    @Autowired
    @Scope(value=BeanDefinition.SCOPE_PROTOTYPE)
    public JmsTemplate jmsTemplate(@Qualifier("pooledJmsConnectionFactory") ConnectionFactory pooledJmsConnectionFactory) 
        return new JmsTemplate(pooledJmsConnectionFactory);
    

Broadcaster.java

然后我在广播类中注入先前创建的JmsTemplate

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class ConcreteBroadcaster implements Broadcaster 

    private Logger log = LoggerFactory.getLogger(ConcreteBroadcaster.class);

    @Autowired
    private JmsTemplate template;
    private final Destination destination;

    public ConcreteBroadcaster(Destination destination) 
        this.destination = destination;
    

    /**
     * Broadcasts event.
     */
    @Override
    public void broadcast(final Event event) 
        template.send(destination, new MessageCreator() 

            @Override
            public Message createMessage(Session session) throws JMSException 
                final Message eventMessage = session.createObjectMessage(event);
                if (log.isDebugEnabled()) 
                    log.debug("Broadcasting event  through JMS topic ", event.getClass().getSimpleName(), destination);
                
                return eventMessage;
            
        );
    

当只有一个 ActiveMQ 代理时,一切正常。当我将我的jms.url 更改为:

jms.url: "tcp://10.0.0.1:61616"

到:

jms.url: "failover:(tcp://10.0.0.1:61616,tcp://10.0.0.2:61616)?randomize=false"

并在给定主机上使用默认配置运行两个 ActiveMQ 代理,我的应用程序启动并用以下日志淹没我:

16 Feb 2015 16:37:40,491 INFO  ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616
16 Feb 2015 16:37:41,499 INFO  ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616
16 Feb 2015 16:37:42,508 INFO  ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616
16 Feb 2015 16:37:43,514 INFO  ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616
16 Feb 2015 16:37:44,522 INFO  ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616

所以似乎每秒都会重新连接。当我启用调试日志记录时,它看起来像这样:

16 Feb 2015 19:46:25,050 DEBUG ache.activemq.transport.failover.FailoverTransport - urlList connectionList:[tcp://10.0.0.1:61616, tcp://10.0.0.2:61616], from: [tcp://10.0.0.1:61616, tcp://10.0.0.2:61616]
16 Feb 2015 19:46:25,051 DEBUG ache.activemq.transport.failover.FailoverTransport - Attempting  0th  connect to: tcp://10.0.0.1:61616
16 Feb 2015 19:46:25,053 DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo  version=9, properties=MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true, magic=[A,c,t,i,v,e,M,Q]
16 Feb 2015 19:46:25,053 DEBUG ache.activemq.transport.failover.FailoverTransport - Connection established
16 Feb 2015 19:46:25,053 INFO  ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616
16 Feb 2015 19:46:25,056 DEBUG    org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo  version=9, properties=MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true, magic=[A,c,t,i,v,e,M,Q] and remote: WireFormatInfo  version=5, properties=CacheSize=1024, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true, TightEncodingEnabled=true, StackTraceEnabled=true, magic=[A,c,t,i,v,e,M,Q]
16 Feb 2015 19:46:25,056 DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo  version=5, properties=CacheSize=1024, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true, TightEncodingEnabled=true, StackTraceEnabled=true, magic=[A,c,t,i,v,e,M,Q]
16 Feb 2015 19:46:25,056 DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp:///10.0.0.1:61616@40113 before negotiation: OpenWireFormatversion=9, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807
16 Feb 2015 19:46:25,057 DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp:///10.0.0.1:61616@40113 after negotiation: OpenWireFormatversion=5, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807
16 Feb 2015 19:46:25,064 DEBUG       org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@46abe3f3[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:26,065 DEBUG        org.apache.activemq.ActiveMQMessageConsumer - remove: ID:dev.vagrant.local-33222-1424101620085-1:6862:1:1, lastDeliveredSequenceId:0
16 Feb 2015 19:46:26,066 DEBUG                org.apache.activemq.ActiveMQSession - ID:dev.vagrant.local-33222-1424101620085-1:6862:1 Transaction Commit :null
16 Feb 2015 19:46:26,068 DEBUG           org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@46abe3f3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] is shutdown: true and terminated: true took: 0.000 seconds.
16 Feb 2015 19:46:26,069 DEBUG           org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@228b3f4c[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
16 Feb 2015 19:46:26,069 DEBUG ache.activemq.transport.failover.FailoverTransport - Stopped tcp://10.0.0.1:61616
16 Feb 2015 19:46:26,070 DEBUG           org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@565b26a9[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
16 Feb 2015 19:46:26,071 DEBUG     org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp:///10.0.0.1:61616@40113
16 Feb 2015 19:46:26,071 DEBUG       org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@74e8dd0c[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:26,072 DEBUG   org.apache.activemq.transport.tcp.TcpTransport$1 - Closed socket Socket[addr=/10.0.0.1,port=61616,localport=40113]
16 Feb 2015 19:46:26,072 DEBUG           org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@74e8dd0c[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:26,073 DEBUG       org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@3a0a1c78[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:26,074 DEBUG ache.activemq.transport.failover.FailoverTransport - Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.
16 Feb 2015 19:46:26,074 DEBUG ache.activemq.transport.failover.FailoverTransport - Started unconnected
16 Feb 2015 19:46:26,074 DEBUG ache.activemq.transport.failover.FailoverTransport - Waking up reconnect task
16 Feb 2015 19:46:26,075 DEBUG ache.activemq.transport.failover.FailoverTransport - urlList connectionList:[tcp://10.0.0.1:61616, tcp://10.0.0.2:61616], from: [tcp://10.0.0.1:61616, tcp://10.0.0.2:61616]
16 Feb 2015 19:46:26,076 DEBUG ache.activemq.transport.failover.FailoverTransport - Attempting  0th  connect to: tcp://10.0.0.1:61616
16 Feb 2015 19:46:26,077 DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo  version=9, properties=MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true, magic=[A,c,t,i,v,e,M,Q]
16 Feb 2015 19:46:26,077 DEBUG ache.activemq.transport.failover.FailoverTransport - Connection established
16 Feb 2015 19:46:26,077 INFO  ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616
16 Feb 2015 19:46:26,082 DEBUG    org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo  version=9, properties=MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true, magic=[A,c,t,i,v,e,M,Q] and remote: WireFormatInfo  version=5, properties=CacheSize=1024, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true, TightEncodingEnabled=true, StackTraceEnabled=true, magic=[A,c,t,i,v,e,M,Q]
16 Feb 2015 19:46:26,083 DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo  version=5, properties=CacheSize=1024, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true, TightEncodingEnabled=true, StackTraceEnabled=true, magic=[A,c,t,i,v,e,M,Q]
16 Feb 2015 19:46:26,083 DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp:///10.0.0.1:61616@40114 before negotiation: OpenWireFormatversion=9, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807
16 Feb 2015 19:46:26,083 DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp:///10.0.0.1:61616@40114 after negotiation: OpenWireFormatversion=5, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807
16 Feb 2015 19:46:26,093 DEBUG       org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@6d0c65b0[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:27,094 DEBUG        org.apache.activemq.ActiveMQMessageConsumer - remove: ID:dev.vagrant.local-33222-1424101620085-1:6863:1:1, lastDeliveredSequenceId:0
16 Feb 2015 19:46:27,096 DEBUG                org.apache.activemq.ActiveMQSession - ID:dev.vagrant.local-33222-1424101620085-1:6863:1 Transaction Commit :null
16 Feb 2015 19:46:27,098 DEBUG           org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@6d0c65b0[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] is shutdown: true and terminated: true took: 0.000 seconds.
16 Feb 2015 19:46:27,098 DEBUG           org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@76d73f47[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
16 Feb 2015 19:46:27,099 DEBUG ache.activemq.transport.failover.FailoverTransport - Stopped tcp://10.0.0.1:61616
16 Feb 2015 19:46:27,100 DEBUG     org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp:///10.0.0.1:61616@40114
16 Feb 2015 19:46:27,100 DEBUG       org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@1ffc55fd[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:27,101 DEBUG           org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@3a0a1c78[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
16 Feb 2015 19:46:27,102 DEBUG       org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@5a12ff9c[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:27,103 DEBUG ache.activemq.transport.failover.FailoverTransport - Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.
16 Feb 2015 19:46:27,103 DEBUG ache.activemq.transport.failover.FailoverTransport - Started unconnected
16 Feb 2015 19:46:27,104 DEBUG ache.activemq.transport.failover.FailoverTransport - Waking up reconnect task
16 Feb 2015 19:46:27,103 DEBUG   org.apache.activemq.transport.tcp.TcpTransport$1 - Closed socket Socket[addr=/10.0.0.1,port=61616,localport=40114]
16 Feb 2015 19:46:27,104 DEBUG           org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@1ffc55fd[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
16 Feb 2015 19:46:27,105 WARN  ache.activemq.transport.failover.FailoverTransport - Transport (tcp://10.0.0.1:61616) failed, reason:  java.io.EOFException, not attempting to automatically reconnect
16 Feb 2015 19:46:27,105 DEBUG             org.apache.activemq.ActiveMQConnection - transport interrupted, dispatchers: 0
16 Feb 2015 19:46:27,105 DEBUG             org.apache.activemq.ActiveMQConnection - notified failover transport (unconnected) of pending interruption processing for: ID:dev.vagrant.local-33222-1424101620085-1:6863
16 Feb 2015 19:46:27,106 DEBUG ache.activemq.transport.failover.FailoverTransport - urlList connectionList:[tcp://10.0.0.1:61616, tcp://10.0.0.2:61616], from: [tcp://10.0.0.1:61616, tcp://10.0.0.2:61616]
16 Feb 2015 19:46:27,107 DEBUG ache.activemq.transport.failover.FailoverTransport - Attempting  0th  connect to: tcp://10.0.0.1:61616
16 Feb 2015 19:46:27,109 DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo  version=9, properties=MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true, magic=[A,c,t,i,v,e,M,Q]
16 Feb 2015 19:46:27,109 DEBUG ache.activemq.transport.failover.FailoverTransport - Connection established
16 Feb 2015 19:46:27,109 INFO  ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616
16 Feb 2015 19:46:27,113 DEBUG    org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo  version=9, properties=MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true, magic=[A,c,t,i,v,e,M,Q] and remote: WireFormatInfo  version=5, properties=CacheSize=1024, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true, TightEncodingEnabled=true, StackTraceEnabled=true, magic=[A,c,t,i,v,e,M,Q]
16 Feb 2015 19:46:27,114 DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo  version=5, properties=CacheSize=1024, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true, TightEncodingEnabled=true, StackTraceEnabled=true, magic=[A,c,t,i,v,e,M,Q]
16 Feb 2015 19:46:27,114 DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp:///10.0.0.1:61616@40115 before negotiation: OpenWireFormatversion=9, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807
16 Feb 2015 19:46:27,114 DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp:///10.0.0.1:61616@40115 after negotiation: OpenWireFormatversion=5, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807
16 Feb 2015 19:46:27,121 DEBUG       org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@1cf53b77[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:28,123 DEBUG        org.apache.activemq.ActiveMQMessageConsumer - remove: ID:dev.vagrant.local-33222-1424101620085-1:6864:1:1, lastDeliveredSequenceId:0
16 Feb 2015 19:46:28,124 DEBUG                org.apache.activemq.ActiveMQSession - ID:dev.vagrant.local-33222-1424101620085-1:6864:1 Transaction Commit :null
16 Feb 2015 19:46:28,126 DEBUG           org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@1cf53b77[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] is shutdown: true and terminated: true took: 0.000 seconds.
16 Feb 2015 19:46:28,127 DEBUG           org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@39d64d12[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
16 Feb 2015 19:46:28,128 DEBUG ache.activemq.transport.failover.FailoverTransport - Stopped tcp://10.0.0.1:61616
16 Feb 2015 19:46:28,128 DEBUG           org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5a12ff9c[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
16 Feb 2015 19:46:28,129 DEBUG     org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp:///10.0.0.1:61616@40115
16 Feb 2015 19:46:28,130 DEBUG       org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@372d510a[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:28,130 DEBUG   org.apache.activemq.transport.tcp.TcpTransport$1 - Closed socket Socket[addr=/10.0.0.1,port=61616,localport=40115]
16 Feb 2015 19:46:28,131 DEBUG           org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@372d510a[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:28,131 DEBUG       org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@3cbab1f9[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:28,131 DEBUG ache.activemq.transport.failover.FailoverTransport - Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.
16 Feb 2015 19:46:28,132 DEBUG ache.activemq.transport.failover.FailoverTransport - Started unconnected
16 Feb 2015 19:46:28,133 DEBUG ache.activemq.transport.failover.FailoverTransport - Waking up reconnect task
16 Feb 2015 19:46:28,133 DEBUG ache.activemq.transport.failover.FailoverTransport - urlList connectionList:[tcp://10.0.0.1:61616, tcp://10.0.0.2:61616], from: [tcp://10.0.0.1:61616, tcp://10.0.0.2:61616]
16 Feb 2015 19:46:28,134 DEBUG ache.activemq.transport.failover.FailoverTransport - Attempting  0th  connect to: tcp://10.0.0.1:61616
16 Feb 2015 19:46:28,136 DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo  version=9, properties=MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true, magic=[A,c,t,i,v,e,M,Q]
16 Feb 2015 19:46:28,136 DEBUG ache.activemq.transport.failover.FailoverTransport - Connection established
16 Feb 2015 19:46:28,136 INFO  ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616
16 Feb 2015 19:46:28,142 DEBUG    org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo  version=9, properties=MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true, magic=[A,c,t,i,v,e,M,Q] and remote: WireFormatInfo  version=5, properties=CacheSize=1024, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true, TightEncodingEnabled=true, StackTraceEnabled=true, magic=[A,c,t,i,v,e,M,Q]
16 Feb 2015 19:46:28,144 DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo  version=5, properties=CacheSize=1024, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true, TightEncodingEnabled=true, StackTraceEnabled=true, magic=[A,c,t,i,v,e,M,Q]
16 Feb 2015 19:46:28,144 DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp:///10.0.0.1:61616@40116 before negotiation: OpenWireFormatversion=9, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807
16 Feb 2015 19:46:28,144 DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp:///10.0.0.1:61616@40116 after negotiation: OpenWireFormatversion=5, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807
16 Feb 2015 19:46:28,155 DEBUG       org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@5e95c519[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:28,394 DEBUG che.activemq.transport.AbstractInactivityMonitor$2 - WriteChecker 10000 ms elapsed since last write check.
16 Feb 2015 19:46:29,156 DEBUG        org.apache.activemq.ActiveMQMessageConsumer - remove: ID:dev.vagrant.local-33222-1424101620085-1:6865:1:1, lastDeliveredSequenceId:0
16 Feb 2015 19:46:29,157 DEBUG                org.apache.activemq.ActiveMQSession - ID:dev.vagrant.local-33222-1424101620085-1:6865:1 Transaction Commit :null
16 Feb 2015 19:46:29,160 DEBUG           org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5e95c519[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] is shutdown: true and terminated: true took: 0.000 seconds.
16 Feb 2015 19:46:29,160 DEBUG           org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5af5d17a[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
16 Feb 2015 19:46:29,160 DEBUG ache.activemq.transport.failover.FailoverTransport - Stopped tcp://10.0.0.1:61616
16 Feb 2015 19:46:29,162 DEBUG     org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp:///10.0.0.1:61616@40116
16 Feb 2015 19:46:29,162 DEBUG       org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@5b20212e[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:29,163 DEBUG           org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@3cbab1f9[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
16 Feb 2015 19:46:29,164 DEBUG       org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@6331ee[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:29,164 DEBUG ache.activemq.transport.failover.FailoverTransport - Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.
16 Feb 2015 19:46:29,165 DEBUG ache.activemq.transport.failover.FailoverTransport - Started unconnected
16 Feb 2015 19:46:29,165 DEBUG ache.activemq.transport.failover.FailoverTransport - Waking up reconnect task
16 Feb 2015 19:46:29,165 DEBUG   org.apache.activemq.transport.tcp.TcpTransport$1 - Closed socket Socket[addr=/10.0.0.1,port=61616,localport=40116]
16 Feb 2015 19:46:29,166 DEBUG           org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5b20212e[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
16 Feb 2015 19:46:29,166 WARN  ache.activemq.transport.failover.FailoverTransport - Transport (tcp://10.0.0.1:61616) failed, reason:  java.io.EOFException, not attempting to automatically reconnect
16 Feb 2015 19:46:29,167 DEBUG             org.apache.activemq.ActiveMQConnection - transport interrupted, dispatchers: 0
16 Feb 2015 19:46:29,167 DEBUG             org.apache.activemq.ActiveMQConnection - notified failover transport (unconnected) of pending interruption processing for: ID:dev.vagrant.local-33222-1424101620085-1:6865
16 Feb 2015 19:46:29,167 DEBUG ache.activemq.transport.failover.FailoverTransport - urlList connectionList:[tcp://10.0.0.1:61616, tcp://10.0.0.2:61616], from: [tcp://10.0.0.1:61616, tcp://10.0.0.2:61616]

我不知道出了什么问题,请帮忙。

【问题讨论】:

【参考方案1】:

看起来 1 秒后有什么东西正在关闭连接;打开 TRACE 级别的日志记录;池化连接工厂在启用该功能的情况下会输出更多日志。

或者,尝试使用 Spring 的 CachingConnectionFactory 而不是 PooledConnectionFactory

【讨论】:

以上是关于ActiveMQ Failover transport + Spring JmsTemplate 不断重连的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ的断线重连机制

ActiveMQ 简单应用

ActiveMQ(13):ActiveMQ的集群

玩转ActiveMQ集群

ActiveMQ集群搭建

Mule JMS ActiveMQ 传输失败到故障转移