Spring AMQP RPC消费者并抛出异常

Posted

技术标签:

【中文标题】Spring AMQP RPC消费者并抛出异常【英文标题】:Spring AMQP RPC consumer and throw exception 【发布时间】:2019-04-16 03:52:47 【问题描述】:

我有一个 RPC 模式的消费者(RabbitListner),我想知道是否可以抛出可以被发布者处理的异常。

为了更清楚我的解释,案例如下:

发布者以 RPC 方式发送消息 消费者收到消息,检查消息的有效性,如果消息由于缺少参数而无法计数,那么我想抛出异常。异常可以是特定的业务异常或特定的 AmqpException,但我希望发布者可以处理此异常,如果它没有超时。

我尝试使用 AmqpRejectAndDontRequeueException,但我的发布者没有收到异常,而只是一个空的响应。

是否可以这样做,或者这样实施可能不是一个好习惯?

编辑 1:

在@GaryRussel 回复之后是我的问题的解决方案:

    对于 RabbitListner,我创建了一个错误处理程序:

     @Configuration
     public class RabbitErrorHandler implements         RabbitListenerErrorHandler 
     @Override public Object handleError(Message message, org.springframework.messaging.Message<?> message1, ListenerExecutionFailedException e) 
    throw e;
    
    

    将 bean 定义到配置文件中:

    @配置 公共类 RabbitConfig 扩展 RabbitConfiguration

    @Bean
    public RabbitTemplate getRabbitTemplate() 
        Message.addWhiteListPatterns(RabbitConstants.CLASSES_TO_SEND_OVER_RABBITMQ);
    return new RabbitTemplate(this.connectionFactory());
    

    /**
    * Define the RabbitErrorHandle
    * @return Initialize RabbitErrorHandle bean
    */
    @Bean
    public RabbitErrorHandler rabbitErrorHandler() 
       return new RabbitErrorHandler();
    
    
    

    使用参数创建 @RabbitListner,其中 rabbitErrorHandler 是我之前定义的 bean:

    @Override
    @RabbitListener(queues = "$rabbit.queue"
        , errorHandler = "rabbitErrorHandler"
        , returnExceptions = "true")
     public ReturnObject receiveMessage(Message message) 
    

    对于 RabbitTemplate 我设置了这个属性:

       rabbitTemplate.setMessageConverter(new RemoteInvocationAwareMessageConverterAdapter());
    

当消息受到消费者威胁,但它发送了错误时,我获得了一个 RemoteInvocationResult,其中包含 e.getCause().getCause() 中的原始异常。

【问题讨论】:

【参考方案1】:

查看@RabbitListener 上的returnExceptions 属性(自2.0 起)。 Docs here.

returnExceptions属性,当true会导致异常返回给发送者。异常包装在 RemoteInvocationResult 对象中。

在发送方,有一个可用的RemoteInvocationAwareMessageConverterAdapter,如果配置到RabbitTemplate 中,将重新抛出服务器端异常,包裹在AmqpRemoteException 中。服务器异常的堆栈跟踪将通过合并服务器和客户端堆栈跟踪来合成。

重要

这种机制一般只适用于使用Java序列化的默认SimpleMessageConverter;异常通常不是“杰克逊友好的”,因此不能序列化为 JSON。如果您使用的是 JSON,请考虑在抛出异常时使用 errorHandler 来返回其他一些对 Jackson 友好的 Error 对象。

【讨论】:

感谢您的回答。也感谢与 theMayer 的讨论。最后,我决定使用错误处理程序按照您的建议进行操作。最初我已经按照 Mayer 的建议完成了,但是有异常会更安全,并且在我没有捕获所有内容的情况下,处理程序将确保不会丢失异常。可能我会把我的一些代码放在我的问题中,因为我发现它不是很容易(我是 Spring AMQP 的新手)所以我花时间找到一种方法,也许它不是正确的.谢谢 嗨,现在我明白了你的短语对于杰克逊不友好的例外是什么意思。在我使用带有 SimpleMessageConverter 的 rabbit 模板之前,它序列化了我的 java 对象,但现在我想将它用作 JSON。因此,如果我想使用 Jackson 转换器,我可以使用新的 RemoteInvocationAwareMessageConverterAdapter(new Jackson2JsonMessageConverter())。您能给我一些建议吗? 不,没那么简单;正如我上面所说的;您将需要在服务器端进行更改以捕获异常并返回一些特殊的错误对象;然后,您必须在客户端解释该错误对象以将其作为异常抛出。 谢谢。我已经实施了你的建议。我从我的 RabbitErrorHandler 创建了一个在错误情况下返回的自定义对象。【参考方案2】:

您必须将消息作为错误返回,消费应用程序可以选择将其视为异常。但是,我认为正常的异常处理流程不适用于消息传递。您的发布应用程序(RPC 服务的使用者)需要知道可能出现的问题并进行编程以处理这些可能性。

【讨论】:

如果你使用java序列化,你可以配置异常传播到客户端,在那里它们可以被重新抛出。看我的回答。 @GaryRussell - 这是一个很好的答案。我仍然认为“异常”是为不能正常工作的代码保留的,而不是服务故障,其边界通常定义得更好,应该被编程为处理所有非标称条件作为一流的问题代码。 这是应社区的要求添加的,但我同意最好设置一些协议以向发送者返回有意义的错误,避免这种紧密耦合。 感谢您的回复。最后我决定像@GaryRussell 提议的那样做。你的回答也很有趣,告诉我即使这样也不是一个坏习惯。【参考方案3】:

对我有用的是:

在“服务”方面:

服务

@RabbitListener(id = "test1", containerFactory ="BEAN CONTAINER FACTORY", 
queues = "TEST QUEUE", returnExceptions = "true")
DataList getData() 
  // this exception will be transformed by rabbit error handler to a RemoteInvocationResult
 throw new IllegalStateException("mon expecion");
 //return dataHelper.loadAllData();

在“请求”方面:

服务

public void fetchData() throws AmqpRemoteException 
  var response = (DataList) amqpTemplate.convertSendAndReceive("TEST EXCHANGE", "ROUTING NAME", new Object());

  Optional.ofNullable(response)
         .ifPresentOrElse(this::setDataContent, this::handleNoData);

配置

@Bean
AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) 
   var rabbitTemplate = new RabbitTemplate(connectionFactory);
   rabbitTemplate.setMessageConverter(messageConverter);
   return rabbitTemplate;

@Bean
MessageConverter jsonMessageConverter() 
   ObjectMapper objectMapper = new ObjectMapper();
   objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
   objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
   objectMapper.registerModule(new JavaTimeModule());

   var jsonConverter = new Jackson2JsonMessageConverter(objectMapper);

   DefaultClassMapper classMapper = new DefaultClassMapper();
   Map<String, Class<?>> idClassMapping = Map.of(
      DataList.class.getName(), DataList.class,
      RemoteInvocationResult.class.getName(), RemoteInvocationResult.class
   );
   classMapper.setIdClassMapping(idClassMapping);

   jsonConverter.setClassMapper(classMapper);

   // json converter with returned exception awareness
   // this will transform RemoteInvocationResult into a AmqpRemoteException 
   return new RemoteInvocationAwareMessageConverterAdapter(jsonConverter);

【讨论】:

以上是关于Spring AMQP RPC消费者并抛出异常的主要内容,如果未能解决你的问题,请参考以下文章

异常处理——捕获并抛出

怎样自定义一个异常(报错) 并抛出

消息队列客户端开发向导二(基于 Spring 的 amqp 实现)

消息队列客户端开发向导二(基于 Spring 的 amqp 实现)

如何限制对任何路由的直接访问并抛出 404 异常,即使存在路由?

JPA:DELETE WHERE 不删除子级并抛出异常