当 RabbitMQ 交换不存在时如何处理错误(并且消息通过消息传递网关接口发送)

Posted

技术标签:

【中文标题】当 RabbitMQ 交换不存在时如何处理错误(并且消息通过消息传递网关接口发送)【英文标题】:How to handle errors when RabbitMQ exchange doesn't exist (and messages are sent through a messaging gateway interface) 【发布时间】:2020-05-26 13:42:58 【问题描述】:

我想知道在以下情况下处理错误的规范方法是什么(代码是一个最小的工作示例):

消息通过消息网关发送,该网关定义了defaultRequestChannel@Gateway 方法:
@MessagingGateway(name = MY_GATEWAY, defaultRequestChannel = INPUT_CHANNEL)
public interface MyGateway

  @Gateway
  public void sendMessage(String message);
从通道读取消息并通过 AMQP 出站适配器发送:
@Bean
public IntegrationFlow apiMutuaInputFlow()

  return IntegrationFlows
    .from(INPUT_CHANNEL)
    .handle(Amqp.outboundAdapter(rabbitConfig.myTemplate()))
    .get();

RabbitMQ 配置是骨架:
@Configuration
public class RabbitMqConfiguration

    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Bean
    public RabbitTemplate myTemplate()
    
        RabbitTemplate r = new RabbitTemplate(rabbitConnectionFactory);
        r.setExchange(INPUT_QUEUE_NAME);
        r.setConnectionFactory(rabbitConnectionFactory);
        return r;
    

我通常包含一个 bean 来定义我所依赖的 RabbitMQ 配置(交换、队列和绑定),它实际上工作正常。但是在测试失败场景时,我发现了一种我不知道如何正确处理使用 Spring Integration 的情况。步骤是:

删除配置 RabbitMQ 的 bean 针对未配置的 vanilla RabbitMQ 实例运行流程。

我的期望是:

无法传递消息,因为找不到交换。 要么我找到某种方法从调用者线程上的消息传递网关获取异常。 要么我找到一些方法来拦截这个错误。

我发现了什么:

无法传递消息,因为找不到交换器,并且确实每次调用 @Gateway 方法时都会记录此错误消息。
2020-02-11 08:18:40.746 ERROR 42778 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'my.exchange' in vhost '/', class-id=60, method-id=40)
网关没有失败,我也没有找到配置它的方法(例如:将throws 子句添加到接口方法,配置事务通道,设置wait-for-confirmconfirm-timeout)。 我还没有找到其他方法来捕获 CachingConectionFactory 错误(例如:配置事务通道)。 我还没有找到在另一个通道(在网关的 errorChannel 上指定)或 Spring Integration 的默认 errorChannel 上捕获错误消息的方法。

我了解此类故障可能不会通过消息传递网关向上游传播,其工作是将调用者与消息传递 API 隔离开来,但我绝对希望此类错误是可拦截的。

你能指出我正确的方向吗?

谢谢。

【问题讨论】:

【参考方案1】:

RabbitMQ 天生就是异步的,这也是它表现如此出色的原因之一。

但是,您可以通过启用确认和返回并设置此选项来阻止调用者:

/**
 * Set to true if you want to block the calling thread until a publisher confirm has
 * been received. Requires a template configured for returns. If a confirm is not
 * received within the confirm timeout or a negative acknowledgment or returned
 * message is received, an exception will be thrown. Does not apply to the gateway
 * since it blocks awaiting the reply.
 * @param waitForConfirm true to block until the confirmation or timeout is received.
 * @since 5.2
 * @see #setConfirmTimeout(long)
 * @see #setMultiSend(boolean)
 */
public void setWaitForConfirm(boolean waitForConfirm) 
    this.waitForConfirm = waitForConfirm;

(使用 DSL .waitForConfirm(true))。

这也需要确认相关表达式。这是其中一个测试用例的示例

    @Bean
    public IntegrationFlow flow(RabbitTemplate template) 
        return f -> f.handle(Amqp.outboundAdapter(template)
                .exchangeName("")
                .routingKeyFunction(msg -> msg.getHeaders().get("rk", String.class))
                .confirmCorrelationFunction(msg -> msg)
                .waitForConfirm(true));
    

    @Bean
    public CachingConnectionFactory cf() 
        CachingConnectionFactory ccf = new CachingConnectionFactory(
                RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
        ccf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        ccf.setPublisherReturns(true);
        return ccf;
    

    @Bean
    public RabbitTemplate template(ConnectionFactory cf) 
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);
        rabbitTemplate.setMandatory(true);               // for returns
        rabbitTemplate.setReceiveTimeout(10_000);
        return rabbitTemplate;
    

请记住,这会大大减慢速度(类似于使用事务),因此您可能需要重新考虑是否要在每次发送时都执行此操作(除非性能不是问题)。

【讨论】:

感谢@gary-russel,您的回答为我指明了正确的方向。 AFAICT,发布者确认看起来像是使用 RabbitMQ 在发布者方面提供数据安全的首选方式。这比仅仅将频道标记为已交易要复杂得多,但即使是粗略的初步测量也表明发布者确认比交易更轻松。 确实如此,但最好的性能是发送一堆消息然后等待确认,而不是一次一个;但是对于您的用例,如果您想“立即”获得异常,您别无选择。

以上是关于当 RabbitMQ 交换不存在时如何处理错误(并且消息通过消息传递网关接口发送)的主要内容,如果未能解决你的问题,请参考以下文章

编译数据库项目时如何处理临时表

使用 Spring ChannelAwareMessageListener 时如何处理 RabbitMQ 消费者取消通知

获取对象时如何处理“匹配查询不存在”

当“try .. except IOError”未捕获时如何处理 FileNotFoundError?

从 JSON + Swift 或 ObjC 检索数据时如何处理 CoreData 中的关系

[当PHP代码失败但发送200时如何处理AJAX?