RabbitMQ - Apache Camel 读取消息如何处理失败的消息
Posted
技术标签:
【中文标题】RabbitMQ - Apache Camel 读取消息如何处理失败的消息【英文标题】:RabbitMQ - Apache Camel Reading Messages what to do with failed messages 【发布时间】:2016-01-16 09:27:07 【问题描述】:我有以下 php 应用程序。这会将用户注册发布到消息队列。 Java 应用程序从该队列中读取并导入它。希望下面的图表能够描述它。我只处理 Java 方面的事情。 json 消息已经存在于队列中。
路由(Java 消费端)。
@Component
public class SignUpRouting
errorHandler(deadLetterChannel("rabbitmq://signUpDeadLetter.exchange?username=etc..").useOriginalMessage());
from("rabbitmq://phpSignUp.exchange?username=etc....")
.routeId("signUpRoute")
.processRef("signUpProcessor")
.end();
//....
处理器..
@Component
public class SignupProcessor implements Processor
private ObjectMapper mapper = new ObjectMapper();
@Override
public void process(Exchange exchange) throws Exception
String json = exchange.getIn().getBody(String.class);
SignUpDto dto = mapper.readValue(json, SignUpDto.class);
SignUp signUp = new SignUp();
signUp.setWhatever(dto.getWhatever());
//etc....
// save record
signUpDao.save(signUp);
我的问题是这个..当处理器无法导入消息时我该怎么办。
比如说,有一个 DAO 异常。数据字段可能过长或导入格式不正确。我不想丢失消息。我想查看错误并重试导入。但我不想每 30 秒重试一次消息。
我在想我需要创建另一个队列.. 死信队列并无限期地每 6 小时重试一次消息?.. 然后我会查看日志以查看错误并上传修复程序,然后消息会再加工?
我将如何实现它?还是我走错了路?
编辑我已经尝试设置 deadLetterExchange 以查看是否会朝着正确的方向发展...但是它出错并说队列不能为非空
rabbitmq://phpSignUp.exchange?username=etc...&deadLetterExchange=signUpDeadLetter.exchange
【问题讨论】:
如果您正在使用另一个队列,为什么不将确切的失败消息与异常的堆栈跟踪一起存储,然后处理来自该队列的数据 我不确定我是否理解。你能举个例子吗? 如果您有足够的支持团队,我会将消息发送到另一个队列或写入数据库表,然后发送电子邮件通知支持人员。创建另一个界面,允许支持人员修改消息文本并将其重新注入注册处理器。无论您做什么,都需要人工干预。相应地设计。 PHP应用程序应该有很好的验证,这样他就很少发生了。 【参考方案1】:这里是一个使用死信头的例子:
<from uri="rabbitmq://localhost/youexchange?queue=yourq1&
exchangeType=topic&
routingKey=user.reg.*&
deadLetterExchange=dead.msgs&
deadLetterExchangeType=topic&
deadLetterRoutingKey=dead.letters&
deadLetterQueue=dead.letters&
autoAck=false&
autoDelete=false"/>
<!--We can use onException to make camel to retry, and after that, dead letter queue are the fallback-->
<onException useOriginalMessage="true">
<exception>java.lang.Exception</exception>
<redeliveryPolicy asyncDelayedRedelivery="true" maximumRedeliveries="3" redeliveryDelay="5000"/>
</onException>
我们需要关闭autoAck并设置deadLetterQueue,那么如果抛出异常,消息将进入死信队列。 使用onException,我们可以控制骆驼将消息放入死信队列之前的重试。
【讨论】:
几个问题。为什么我们需要开启autoAck?什么是 deadLetterExchangeType=topic? 如果 autoAck 开启,camel 将在接收消息时发送 basic.ack。骆驼将不可能在异常时发送 basic.refuse 或 basic.nack,因此死信属性将无用。 而 deadLetterExchangeXXX 属性用于将死信路由到指定的交换和队列。在演示设置中,我使用了主题,但您可以使用其他交换类型。【参考方案2】:你可以使用onException来捕获异常,如果有异常,消息会被路由到死信交换,这里是Spring DSL中的例子:
<onException useOriginalMessage="true"> <exception>java.sql.SQLException</exception> <redeliveryPolicy asyncDelayedRedelivery="true" maximumRedeliveries="1" redeliveryDelay="1000"/> <inOnly uri="rabbitmq://localhost/dead.msgs?exchangeType=fanout& autoDelete=false& bridgeEndpoint=true"/> </onException>
【讨论】:
这是一个很好的答案和可行的解决方案 +1。然而,在我看来,它没有使用 rabbitmq 特定的死信标头和配置。如所见camel.apache.org/rabbitmq.html 为什么需要设置bridgeEndpoint=true。那是为了什么? 如果不添加该属性,目标队列将不会收到任何消息。以上是关于RabbitMQ - Apache Camel 读取消息如何处理失败的消息的主要内容,如果未能解决你的问题,请参考以下文章
架构设计:系统间通信(36)——Apache Camel快速入门(上)