RabbitMQ - Apache Camel 读取消息如何处理失败的消息

Posted

技术标签:

【中文标题】RabbitMQ - Apache Camel 读取消息如何处理失败的消息【英文标题】:RabbitMQ - Apache Camel Reading Messages what to do with failed messages 【发布时间】:2016-01-16 09:27:07 【问题描述】:

我有以下 php 应用程序。这会将用户注册发布到消息队列。 Java 应用程序从该队列中读取并导入它。希望下面的图表能够描述它。我只处理 Java 方面的事情。 json 消息已经存在于队列中。

路由(Java 消费端)。

@Component
public class SignUpRouting 

  errorHandler(deadLetterChannel("rabbitmq://signUpDeadLetter.exchange?username=etc..").useOriginalMessage());

  from("rabbitmq://phpSignUp.exchange?username=etc....")
            .routeId("signUpRoute")
            .processRef("signUpProcessor")
            .end();
  //.... 

处理器..

@Component
public class SignupProcessor implements Processor 

    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public void process(Exchange exchange) throws Exception 

        String json = exchange.getIn().getBody(String.class);
        SignUpDto dto = mapper.readValue(json, SignUpDto.class);

        SignUp signUp = new SignUp();
        signUp.setWhatever(dto.getWhatever());
        //etc....

        // save record
        signUpDao.save(signUp);
    

我的问题是这个..当处理器无法导入消息时我该怎么办。

比如说,有一个 DAO 异常。数据字段可能过长或导入格式不正确。我不想丢失消息。我想查看错误并重试导入。但我不想每 30 秒重试一次消息。

我在想我需要创建另一个队列.. 死信队列并无限期地每 6 小时重试一次消息?.. 然后我会查看日志以查看错误并上传修复程序,然后消息会再加工?

我将如何实现它?还是我走错了路?

编辑我已经尝试设置 deadLetterExchange 以查看是否会朝着正确的方向发展...但是它出错并说队列不能为非空

 rabbitmq://phpSignUp.exchange?username=etc...&deadLetterExchange=signUpDeadLetter.exchange

【问题讨论】:

如果您正在使用另一个队列,为什么不将确切的失败消息与异常的堆栈跟踪一起存储,然后处理来自该队列的数据 我不确定我是否理解。你能举个例子吗? 如果您有足够的支持团队,我会将消息发送到另一个队列或写入数据库表,然后发送电子邮件通知支持人员。创建另一个界面,允许支持人员修改消息文本并将其重新注入注册处理器。无论您做什么,都需要人工干预。相应地设计。 PHP应用程序应该有很好的验证,这样他就很少发生了。 【参考方案1】:

这里是一个使用死信头的例子:

        <from uri="rabbitmq://localhost/youexchange?queue=yourq1&amp;
            exchangeType=topic&amp;
            routingKey=user.reg.*&amp;
            deadLetterExchange=dead.msgs&amp;
            deadLetterExchangeType=topic&amp;
            deadLetterRoutingKey=dead.letters&amp;
            deadLetterQueue=dead.letters&amp;
            autoAck=false&amp;
            autoDelete=false"/>

          <!--We can use onException to make camel to retry, and after that, dead letter queue are the fallback-->
        <onException useOriginalMessage="true">
            <exception>java.lang.Exception</exception>
            <redeliveryPolicy asyncDelayedRedelivery="true" maximumRedeliveries="3" redeliveryDelay="5000"/>
        </onException>

我们需要关闭autoAck并设置deadLetterQueue,那么如果抛出异常,消息将进入死信队列。 使用onException,我们可以控制骆驼将消息放入死信队列之前的重试。

【讨论】:

几个问题。为什么我们需要开启autoAck?什么是 deadLetterExchangeType=topic? 如果 autoAck 开启,camel 将在接收消息时发送 basic.ack。骆驼将不可能在异常时发送 basic.refuse 或 basic.nack,因此死信属性将无用。 而 deadLetterExchangeXXX 属性用于将死信路由到指定的交换和队列。在演示设置中,我使用了主题,但您可以使用其他交换类型。【参考方案2】:

你可以使用onException来捕获异常,如果有异常,消息会被路由到死信交换,这里是Spring DSL中的例子:

<onException useOriginalMessage="true">
            <exception>java.sql.SQLException</exception>
            <redeliveryPolicy asyncDelayedRedelivery="true" maximumRedeliveries="1" redeliveryDelay="1000"/>

            <inOnly uri="rabbitmq://localhost/dead.msgs?exchangeType=fanout&amp;
                    autoDelete=false&amp;
                    bridgeEndpoint=true"/>
</onException>

【讨论】:

这是一个很好的答案和可行的解决方案 +1。然而,在我看来,它没有使用 rabbitmq 特定的死信标头和配置。如所见camel.apache.org/rabbitmq.html 为什么需要设置bridgeEndpoint=true。那是为了什么? 如果不添加该属性,目标队列将不会收到任何消息。

以上是关于RabbitMQ - Apache Camel 读取消息如何处理失败的消息的主要内容,如果未能解决你的问题,请参考以下文章

Rabbitmq 骆驼弹簧靴自动配置

架构设计:系统间通信(36)——Apache Camel快速入门(上)

架构设计:系统间通信(36)——Apache Camel快速入门(上)

Apache Camel:轮询消费者

Camel 和rabbitmq 集成处理

Camel - 内容丰富器:enrich() 与 pollEnrich()