Spring AMQP v1.4.2 - 网络故障时的兔子重新连接问题

Posted

技术标签:

【中文标题】Spring AMQP v1.4.2 - 网络故障时的兔子重新连接问题【英文标题】:Spring AMQP v1.4.2 - Rabbit reconnection issue on network failure 【发布时间】:2015-03-14 15:32:29 【问题描述】:

我在 Spring AMQP v1.4.2 中测试以下场景,网络中断后无法重新连接:

    启动 spring 应用程序,使用 rabbit:listener-container 和 rabbit:connection-factory 异步消费消息(详细配置如下)。 日志显示应用程序正在成功接收消息。 通过丢弃rabbit服务器上的入站网络流量使RabbitMQ对应用程序不可见:sudo iptables -A INPUT -p tcp --destination-port 5672 -j DROP 等待至少 3 分钟(网络连接超时)。 修复连接:sudo iptables -D INPUT -p tcp --destination-port 5672 -j DROP 等待一段时间(甚至尝试了一个多小时),但没有发生重新连接。 重新启动应用程序,它又开始接收消息,这意味着网络恢复正常。

我还使用 VM 网络适配器断开连接而不是 iptables drop 测试了相同的场景,并且发生了同样的事情,即没有自动重新连接。有趣的是,当我尝试使用 iptables REJECT 而不是 DROP 时,它按预期工作,并且应用程序在我删除拒绝规则后立即重新启动,但我认为拒绝更像是服务器故障而不是网络故障.

根据reference document:

如果 MessageListener 由于业务异常而失败,则异常由消息侦听器容器处理,然后它会返回侦听另一条消息。如果失败是由断开的连接(不是业务异常)引起的,那么正在为侦听器收集消息的消费者必须被取消并重新启动。 SimpleMessageListenerContainer 无缝处理这个问题,它会留下一个日志说监听器正在重新启动。 事实上,它会无休止地循环尝试重新启动消费者,只有当消费者表现得很糟糕时才会这样做放弃。一个副作用是,如果代理在容器启动时关闭,它将继续尝试直到可以建立连接。

这是我在断开连接大约一分钟后得到的日志:

    2015-01-16 14:00:42,433 WARN  [SimpleAsyncTaskExecutor-5] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:565) ~[amqp-client-3.4.2.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: java.io.EOFException: null
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[na:1.7.0_55]
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ~[amqp-client-3.4.2.jar:na]
    ... 1 common frames omitted

我在重新连接几秒钟后收到此日志消息:

2015-01-16 14:18:14,551 WARN  [SimpleAsyncTaskExecutor-2] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection timed out

更新:很奇怪,当我在 org.springframework.amqp 包上启用 DEBUG 日志记录时,重新连接成功,我无法再重现该问题!

在未启用调试日志记录的情况下,我尝试调试 spring AMQP 代码。我观察到,在删除 iptables drop 后不久,SimpleMessageListenerContainer.doStop() 方法被调用,它又调用了 shutdown() 并取消了所有通道。当我在 doStop() 上设置断点时,我也收到了这条日志消息,这似乎与原因有关:

2015-01-20 15:28:44,200 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:44,243 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:44,243 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Queue declaration failed; retries left=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) [spring-rabbit-1.4.2.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
2015-01-20 15:28:49,245 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:49,283 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:49,300 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:429) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    ... 2 common frames omitted
2015-01-20 15:28:49,301 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Stopping container from aborted consumer

更新 2:requested-heartbeat 设置为 30 秒后,按照答案中的建议,重新连接大部分时间都有效,并成功重新定义了专用临时队列,绑定到扇出交换,但它仍然偶尔无法重新连接。

在极少数失败的情况下,我在测试期间监控了 RabbitMQ 管理控制台,观察到新连接已建立(旧连接因超时而被删除后),但重新连接后未重新定义独占临时队列。客户端也没有收到任何消息。现在真的很难可靠地重现该问题,因为它发生的频率较低。我在下面提供了完整的配置,现在包含队列声明。

更新 3: 即使在用自动删除命名队列替换独占临时队列后,偶尔也会发生相同的行为;即重新连接后不会重新定义自动删除命名队列,并且在重新启动应用程序之前不会收到任何消息。

如果有人能在这方面帮助我,我将不胜感激。

这是我所依赖的spring AMQP配置:

<!-- Create a temporary exclusive queue to subscribe to the control exchange -->
<rabbit:queue id="control-queue"/>

<!-- Bind the temporary queue to the control exchange -->
<rabbit:fanout-exchange name="control">
    <rabbit:bindings>
        <rabbit:binding queue="control-queue"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<!-- Subscribe to the temporary queue -->
<rabbit:listener-container connection-factory="connection-factory"
                           acknowledge="none"
                           concurrency="1"
                           prefetch="1">
    <rabbit:listener queues="control-queue" ref="controlQueueConsumer"/>

</rabbit:listener-container>

<rabbit:connection-factory id="connection-factory"
                           username="$rabbit.username"
                           password="$rabbit.password"
                           host="$rabbit.host"
                           virtual-host="$rabbit.virtualhost"
                           publisher-confirms="true" 
                           channel-cache-size="100"
                           requested-heartbeat="30" />

<rabbit:admin id="admin" connection-factory="connection-factory"/>

<rabbit:queue id="qu0-id" name="qu0">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="dead-letter"/>
    </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:topic-exchange id="default-exchange" name="default-ex" declared-by="admin">
    <rabbit:bindings>
        <rabbit:binding queue="qu0" pattern="p.0"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

<rabbit:listener-container connection-factory="connection-factory"
                           acknowledge="manual"
                           concurrency="4"
                           prefetch="30">
    <rabbit:listener queues="qu0" ref="queueConsumerComponent"/>
</rabbit:listener-container>

【问题讨论】:

你不是说最早的 Spring AMQP 版本没有这个问题吗? 您介意在DEBUG 级别共享org.springframework.amqp.rabbit.listener 类别的日志以查看有关此事的更多信息吗?顺便说一句,我刚刚尝试过在 Windows 上使用 tcpTrace 进行类似(或不是?)仿真,并在日志中看到类似的 Caused by: java.io.EOFException: null at java.io.DataInputStream.readUnsignedByte。但是当我重新启动trace 时,连接就会恢复。我的 AMQP 客户端是 3.4.2 - Spring AMQP 的传递依赖。 不特定于 Spring AMQP,但如果您想要重新连接和恢复队列等资源的能力,您可以尝试使用 Lyra。 【参考方案1】:

setRequestedHeartBeat 设置为ConnectionFactory,将setMissingQueuesFatal(false) 设置为SimpleMessageListenerContainer,以便无限期地重试连接。默认情况下,SimpleMessageListenerContainer setMissingQueuesFatal 设置为 true,并且只会进行 3 次重试。

  @Bean
  public ConnectionFactory connectionFactory() 
    final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(getHost(), getPort());
    connectionFactory.setUsername(getUsername());
    connectionFactory.setPassword(getPassword());
    connectionFactory.setVirtualHost(getVirtualHost());
    connectionFactory.setRequestedHeartBeat(30);
    return connectionFactory;
  

  @Bean
  public SimpleMessageListenerContainer listenerContainerCopernicusErrorQueue() 
    final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueueNames(myQueue().getName());
    container.setMessageListener(messageListenerAdapterQueue());
    container.setDefaultRequeueRejected(false);
    container.setMissingQueuesFatal(false);
    return container;
  

【讨论】:

【参考方案2】:

我们在生产环境中也面临这个问题,可能是因为 Rabbit 节点作为虚拟机在不同的 ESX 机架等上运行。我们发现的解决方法是让我们的客户端应用程序不断尝试重新连接,如果它得到与集群断开连接。 以下是我们应用的设置并且效果很好:

<util:properties id="spring.amqp.global.properties">
  <prop key="smlc.missing.queues.fatal">false</prop>
</util:properties>

当声明队列因致命错误(代理不可用等)而失败时,此属性会更改 Spring AMQP 的全局行为。默认情况下,容器仅尝试 3 次(请参阅显示“retries left=0”的日志消息)。

参考:http://docs.spring.io/spring-amqp/reference/htmlsingle/#containerAttributes

此外,我们添加了 recovery-interval 以便容器从非致命错误中恢复。但是,当全局行为也是重试致命错误(如丢失队列)时,也会使用相同的配置。

<rabbit:listener-container recovery-interval="15000" connection-factory="consumerConnectionFactory">
....
</rabbit:listener-container>

【讨论】:

【参考方案3】:

我刚刚按照描述运行了您的测试(Linux 上的兔子使用iptables 丢弃数据包)。

重新建立连接时没有日志(也许我们应该)。

我建议您打开调试日志以查看重新连接。

编辑:

来自 rabbitmq 文档:

独家 独占队列只能由当前连接访问,并在该连接关闭时被删除。不允许其他连接被动声明排他队列。

你的例外:

reply-code=405,reply-text=RESOURCE_LOCKED - 无法获得对 vhost '/' 中锁定队列 'e4288669-2422-40e6-a2ee-b99542509273' 的独占访问权限,class-id=50,method-

所以问题是代理仍然认为另一个连接存在。

    不要使用独占队列(这样的队列无论如何都会丢失消息)。或者, 设置较低的requestedHeartbeat,以便代理更快地检测到丢失的连接。

【讨论】:

谢谢加里。我尝试了调试日志记录并使用更多信息更新了问题。似乎在重新连接后不久,队列重新声明失败并关闭了 SimpleMessageListenerContainer。 我编辑了答案;以后,请显示所有配置,包括您的队列。 很抱歉,重新连接失败仍然会发生,但这次是偶尔发生。我用详细信息和完整配置更新了问题。 正如我之前所暗示的,如果您想在所有情况下处理此特定网络故障,请不要使用独占队列。如果您不关心丢失的消息,只需使用自动删除队列(但不是独占),或将requestedHeartbeat 减小到更小的值。或者使用命名队列,在这种情况下你不会遇到这个问题。 在我的情况下,问题确实是:RabbitMQ 认为,旧连接仍然存在,并且不允许 Spring 的 AMQP 容器重新声明独占匿名队列。 Spring AMQP 容器仅尝试重新声明队列 4 次,然后就放弃了。在此之后:我的应用程序仍在运行,网络连接正常,但没有收到任何消息。感谢上帝,SimpleRabbitListenerContainerFactory 上有 missingQueuesFatal 选项,这使得 Spring 的 AMQP 容器尝试无限地重新声明队列(不仅仅是 4 次)!或者我们可以将requestedHeartbeat 设置为小于retryCount * interval

以上是关于Spring AMQP v1.4.2 - 网络故障时的兔子重新连接问题的主要内容,如果未能解决你的问题,请参考以下文章

Java网络商城项目 SpringBoot+SpringCloud+Vue 网络商城(SSM前后端分离项目)十七(安装RabbitMQ(Centos6)(入门使用教程))以及Spring AMQP的)

Java网络商城项目 SpringBoot+SpringCloud+Vue 网络商城(SSM前后端分离项目)十七(安装RabbitMQ(Centos6)(入门使用教程))以及Spring AMQP的)

Spring AMQP杂记之Spring实现简述

Spring AMQP杂记之AMQP基本概念

spring-cloud-sleuth 与 spring-amqp 集成

spring-cloud-sleuth与spring-amqp集成