一旦连接失败,Spring消费者没有连接到rabbitmq队列

Posted

技术标签:

【中文标题】一旦连接失败,Spring消费者没有连接到rabbitmq队列【英文标题】:Spring consumers not getting reconnected to rabbit mq queue once connection failed 【发布时间】:2021-09-27 17:41:27 【问题描述】:

我有一个生产者,两个消费者设置,使用 rabbitMQ 作为代理和在 spring 中配置的生产者/消费者应用程序(配置如下所述)。当消费者机器出现故障时,我面临重新连接到消费者的问题,消费者应用程序通常会在一段时间内恢复,但代理(队列)和消费者之间的连接没有重新建立,

我在 rabbit-MQ 管理控制台中进行了验证,虽然消费者在一段时间后自动恢复,但我发现队列下没有列出任何消费者。

感谢任何有关如何解决此问题的见解,如果需要任何进一步的详细信息,请告诉我。

连接工厂配置如下

@Bean 
public CachingConnectionFactory rabbitConnectionFactory() throws Exception 

    com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory(); 
    factory.setHost(host);
    factory.setUsername(username); 
    factory.setPassword(password);
    factory.setPort(5671);
    factory.useSslProtocol(); 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);
    return connectionFactory; 


样本容器工厂

   @Bean(name = "stockcontainer") 
    public SimpleRabbitListenerContainerFactory simpleStokcontainer()  throws Exception
    
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(rabbitConnectionFactory());
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(2); 
        factory.setMaxConcurrentConsumers(4);
        factory.setPrefetchCount(20);
        return factory; 
    
    
    
    @Bean(name = "StockUploadSimplecontainer") 
    public SimpleRabbitListenerContainerFactory StockUploadListenerContainerFactory()  throws Exception
    
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(rabbitConnectionFactory());
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(2); 
        factory.setMaxConcurrentConsumers(4);
        factory.setPrefetchCount(15);
        return factory;
    

消费者中的听众之一

@RabbitListener( queues = "$stock_daily.sync.queue", containerFactory = "stockcontainer", autoStartup = "true")
    public void stockDailySync(SftpStockDailySyncAsyncRequest sftpStockDailySyncRequest) 
        
    

异常

2021-07-20 18:05:08.081 信息 15087 --- [SimpleAsyncTaskExecutor-7] o.s.a.r.l.SimpleMessageListenerContainer:重新启动 消费者@1e89e61: 标签=[amq.ctag-jOkLesmTRAMxV1U6P6RTIg=omnirio_supplierbulk_queue], 频道=缓存兔子频道: AMQChannel(amqp://prod-core-mq@...*:5671/,11),conn: Proxy@4de7441e 共享兔子连接:SimpleConnection@302dbb33 [delegate=amqp://prod-core-mq@...*:5671/, localPort= 36542], 确认模式=自动本地队列大小=0 2021-07-20 18:05:08.081 错误 15087 --- [SimpleAsyncTaskExecutor-8] o.s.a.r.l.SimpleMessageListenerContainer:检查/重新声明失败 自动删除队列。

org.springframework.amqp.rabbit.connection.AutoRecoverConnectionNotCurrentlyOpenException: 自动恢复连接当前未打开 在 org.springframework.amqp.rabbit.connection.SimpleConnection.isOpen(SimpleConnection.java:100) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.isOpen(CachingConnectionFactory.java:1240) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:472) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1500(CachingConnectionFactory.java:94) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1161) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1803) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1752) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:338) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1604) [spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) [spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]

2021-07-20 18:05:08.806 信息 15087 --- [SimpleAsyncTaskExecutor-6] o.s.a.r.l.SimpleMessageListenerContainer:重新启动 消费者@664b6f7c: 标签=[amq.ctag-HzahvRL3wv6m0E4BKPaROw=omnirio_supplierbulk_queue], 频道=缓存兔子频道: AMQChannel(amqp://prod-core-mq@...*:5671/,4), conn: Proxy@4de7441e 共享兔子连接:SimpleConnection@302dbb33 [delegate=amqp://prod-core-mq@...*:5671/, localPort= 36542], 确认模式=自动本地队列大小=0 2021-07-20 18:05:08.807 错误 15087 --- [SimpleAsyncTaskExecutor-9] o.s.a.r.l.SimpleMessageListenerContainer:检查/重新声明失败 自动删除队列。

org.springframework.amqp.rabbit.connection.AutoRecoverConnectionNotCurrentlyOpenException: 自动恢复连接当前未打开 在 org.springframework.amqp.rabbit.connection.SimpleConnection.isOpen(SimpleConnection.java:100) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.isOpen(CachingConnectionFactory.java:1240) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:472) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1500(CachingConnectionFactory.java:94) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1161) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1803) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1752) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:338) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1604) [spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) [spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]


我已经更新了rabbitConnectionFactory()方法,增加了一条语句“factory.setAutomaticRecoveryEnabled(false)”,现在工厂方法如下图,这次我遇到了下面提到的不同的异常(Exception-2)

@Bean 
    public CachingConnectionFactory rabbitConnectionFactory() throws Exception 
    
        com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory(); 
        factory.setHost(host);
        factory.setUsername(username); 
        factory.setPassword(password);
        factory.setPort(5671);
        factory.useSslProtocol(); 
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);
        return connectionFactory; 
    

异常 -2

org.springframework.amqp.AmqpIOException: com.rabbitmq.client.ChannelContinuationTimeoutException:继续 在通道上调用方法 #method(out-of-band=) AMQChannel(amqp://prod-core-mq@:5671/,22) (#22) 超时 在 org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:71) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:68) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:1156) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$200(CachingConnectionFactory.java:1144) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:585) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:568) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:538) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:520) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1500(CachingConnectionFactory.java:94) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1161) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1803) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1752) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:338) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1604) [spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) [spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 java.lang.Thread.run(Thread.java:748) [?:1.8.0_292] 引起:com.rabbitmq.client.ChannelContinuationTimeoutException: 继续调用方法 #method(out-of-band=) on 通道 AMQChannel(amqp://prod-core-mq@:5671/,22) (#22) 超时 在 com.rabbitmq.client.impl.AMQChannel.wrapTimeoutException(AMQChannel.java:308) ~[amqp-client-5.1.2.jar!/:5.1.2] 在 com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:290) ~[amqp-client-5.1.2.jar!/:5.1.2] 在 com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138) ~[amqp-client-5.1.2.jar!/:5.1.2] 在 com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:133) ~[amqp-client-5.1.2.jar!/:5.1.2] 在 com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:176) ~[amqp-client-5.1.2.jar!/:5.1.2] 在 com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:542) ~[amqp-client-5.1.2.jar!/:5.1.2] 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection.createChannel(AutorecoveringConnection.java:108) ~[amqp-client-5.1.2.jar!/:5.1.2] 在 org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:57) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] ... 15 更多原因:java.util.concurrent.TimeoutException 在 com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77) ~[amqp-client-5.1.2.jar!/:5.1.2] 在 com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120) ~[amqp-client-5.1.2.jar!/:5.1.2] 在 com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.1.2.jar!/:5.1.2] 在 com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494) ~[amqp-client-5.1.2.jar!/:5.1.2] 在 com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288) ~[amqp-client-5.1.2.jar!/:5.1.2] 在 com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138) ~[amqp-client-5.1.2.jar!/:5.1.2] 在 com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:133) ~[amqp-client-5.1.2.jar!/:5.1.2] 在 com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:176) ~[amqp-client-5.1.2.jar!/:5.1.2] 在 com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:542) ~[amqp-client-5.1.2.jar!/:5.1.2] 在 com.rabbitmq.client.impl.recovery.AutorecoveringConnection.createChannel(AutorecoveringConnection.java:108) ~[amqp-client-5.1.2.jar!/:5.1.2] 在 org.springframework.amqp.r

【问题讨论】:

你需要显示日志。 @GaryRussell 我通过包含日志编辑了我的问题 【参考方案1】:

AutoRecoverConnectionNotCurrentlyOpenException

不要将autoRecoveryEnabled 与 Spring AMQP 一起使用 - 它不兼容(早在将自动恢复添加到 amqp 客户端之前,Spring 就有自己的恢复机制);无论如何它都被有效地禁用了 - Spring 关闭所有自动恢复的连接。

该错误是暂时的;当客户端恢复连接时,Spring会关闭它并创建一个新的连接。

2.0.3.发布

2.0.x 不再受支持;您应该尽快升级到 2.2.18 或 2.3.10。

【讨论】:

按照建议,我已将工厂方法更新为“factory.setAutomaticRecoveryEnabled(false);”声明并重新启动应用程序,但一段时间后,我从消费者那里得到了一个不同的例外,你能否也对此有所了解。我用新的异常 编辑了我的问题 >ChannelContinuationTimeoutException 我不知道是什么原因导致的 - 看起来很奇怪 - 默认超时为 10 分钟!我建议您在代理日志中查找线索和/或询问rabbitmq-users Google 组中的 RabbitMQ 人员可能是什么原因造成的。

以上是关于一旦连接失败,Spring消费者没有连接到rabbitmq队列的主要内容,如果未能解决你的问题,请参考以下文章

arcgis连接到数据库失败是啥原因

如果在 Android 中连接到 BLE 设备失败,如何通知用户?

如何设置远程连接到 postgresql 数据库的 jdbc 连接超时值?

连接网站显示数据库错误:无法连接到数据库:无法连接到MySQL?

从SecureCRT连接到虚拟机失败。提示连接超时。求解决办法。200分。

JDBC 通过端口 1433 连接到主机 localshost 的 TCP/IP 连接失败。错误:“null。