骆驼路由消息到replyTo和错误队列

Posted

技术标签:

【中文标题】骆驼路由消息到replyTo和错误队列【英文标题】:Camel route message to replyTo and error queue 【发布时间】:2018-09-02 00:47:08 【问题描述】:

我是骆驼的新手。 我尝试实现以下行为。 我收到带有已填充 JMSReplyTo 标头的消息。 当抛出任何异常时,我会捕获、格式化并记录它。 然后我想将它发送到错误队列并通过 JMSReplyTo 发送回客户端。 这是我的路线配置。

onException(Exception.class)
        .transform().simple("Message:\n$body\nHeaders:\n$headers\nException:\n$exception.stacktrace")
        .log(LoggingLevel.ERROR, "errors","$body")
        .to("activemq:queue:GENERAL.ERRORS")
        .end();
    from("activemq:queue:changeProfitCenter.input")
        .choice()
            .when(header("JMSType").isEqualTo("xml"))
                .to("direct:xmlChangeProfitCenter")
            .when(header("JMSType").isEqualTo("json"))
                .to("direct:jsonChangeProfitCenter")
            .otherwise()
                .transform(simple("Incorrect message type JMSType = '$header.JMSType'"))
        .end()
    .end();


    from("direct:jsonChangeProfitCenter")
        .unmarshal().json(JsonLibrary.Jackson, Request.class)
        .log(LoggingLevel.INFO, "unmarshal json : $body")
        .bean(testService, "changeProfitCenter")
        .log(LoggingLevel.INFO, "service response : $body")
        .end();

    from("direct:xmlChangeProfitCenter")
        .unmarshal().jaxb("model")
        .log(LoggingLevel.INFO, "unmarshal xml : $body")
        .bean(testService, "changeProfitCenter")
        .log(LoggingLevel.INFO, "service response : $body")
        .end();

当我发送不正确的消息并捕获 ParseException 时,我看到了这种行为。消息转换、记录、发送到 GENERAL.ERRORS,20 秒后消息从队列中消失并移至 DLQ,我可以在我的应用程序中看到此堆栈跟踪。

12:10:51.637 [Camel (camel-1) thread #6 - JmsReplyManagerOnTimeout[GENERAL.ERRORS]] WARN  o.a.c.c.j.r.TemporaryQueueReplyManager - Timeout occurred after 20000 millis waiting for reply message with correlationID [Camel-ID-1521796109842-0-4] on destination temp-queue://ID:1521796109515-1:1:1. Setting ExchangeTimedOutException on (MessageId: ID:38611-1521795301468-8:2:1:1:2 on ExchangeId: ID-1521796109842-0-3) and continue routing.
12:10:51.683 [Camel (camel-1) thread #6 - JmsReplyManagerOnTimeout[GENERAL.ERRORS]] ERROR o.a.c.p.FatalFallbackErrorHandler - Exception occurred while trying to handle previously thrown exception on exchangeId: ID-1521796109842-0-3 using: [Pipeline[[Channel[Transform(Simple: Message:
$body
Headers:
$headers
Exception:
$exception.stacktrace)], Channel[Log(errors)[body]], Channel[sendTo(activemq://queue:GENERAL.ERRORS)]]]]. The previous and the new exception will be logged in the following.
12:10:51.683 [Camel (camel-1) thread #6 - JmsReplyManagerOnTimeout[GENERAL.ERRORS]] ERROR o.a.c.p.FatalFallbackErrorHandler - \--> Previous exception on exchangeId: ID-1521796109842-0-3
java.io.IOException: javax.xml.bind.UnmarshalException
 - with linked exception:
12:10:51.698 [Camel (camel-1) thread #6 - JmsReplyManagerOnTimeout[GENERAL.ERRORS]] ERROR o.a.c.p.FatalFallbackErrorHandler - \--> New exception on exchangeId: ID-1521796109842-0-3
org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis due reply message with correlationID: Camel-ID--1521796109842-0-4 not received on destination: temp-queue://ID:54045-1521796109515-1:1:1. Exchange[ID-1521796109842-0-3]
    at org.apache.camel.component.jms.reply.ReplyManagerSupport.processReply(ReplyManagerSupport.java:170)
    at org.apache.camel.component.jms.reply.TemporaryQueueReplyHandler.onTimeout(TemporaryQueueReplyHandler.java:62)
    at org.apache.camel.component.jms.reply.CorrelationTimeoutMap$1.run(CorrelationTimeoutMap.java:60)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
    at java.base/java.lang.Thread.run(Thread.java:844)
12:10:51.698 [Camel (camel-1) thread #1 - JmsConsumer[changeProfitCenter.input]] WARN  o.a.c.c.jms.EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis due reply message with correlationID: Camel-ID-1521796109842-0-4 not received on destination: temp-queue://ID:54045-1521796109515-1:1:1. Exchange[ID-1521796109842-0-3]]
org.apache.camel.RuntimeCamelException: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis due reply message with correlationID: Camel-ID-1521796109842-0-4 not received on destination: temp-queue://ID:54045-1521796109515-1:1:1. Exchange[ID-1521796109842-0-3]
    at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1831)
    at org.apache.camel.component.jms.EndpointMessageListener$EndpointMessageListenerAsyncCallback.done(EndpointMessageListener.java:195)
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:116)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1166)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1158)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1055)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
    at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis due reply message with correlationID: Camel-ID-1521796109842-0-4 not received on destination: temp-queue://ID:54045-1521796109515-1:1:1. Exchange[ID-1521796109842-0-3]

我有一些问题。 1)为什么消息没有处理给客户端。如果我删除.to("activemq:queue:GENERAL.ERRORS"),它将交付给客户。但我想将它发送到客户端和错误队列。 2)为什么当 JMSExpiration=0 时我有 ExchangeTimedOutException。

有人可以帮我提供发送异常以回复和错误队列吗? 我的最终目标是以一种格式在错误队列中发送消息,并以不同格式回复队列。 我知道多播,但它提供向多个队列发送相同的消息。这不是我想要的

【问题讨论】:

【参考方案1】:

您可以创建一个 errorHandler 并将其发送到您的队列,一旦我使用 RabbitMQ 完成它,是这样的:

/**
     * Dead Letter Channel, it will try delivery the message three times each 60 seconds
     */
    errorHandler(
            deadLetterChannel("rabbitmq:RABBITMQ_ADDRESS/RABBITMQ_EXCHANGE?routingKey=RABBITMQ_QUEUE_DLQ_ROUTING_KEY&username=RABBITMQ_USERNAME&password=RABBITMQ_PSWD&autoDelete=false&queue=RABBITMQ_QUEUE_DLQ")
                    .logExhaustedMessageHistory(true)
                    .maximumRedeliveries(3)
                    .redeliveryDelay(60000)
                    .onPrepareFailure(new FailExecution())
                    .onRedelivery(new RetryExecution()));
from("direct:youtRoute").to("log:foo");

如果对您有帮助,这是我的队列项目。

https://github.com/vtripeno/integration_camel_apis

【讨论】:

以上是关于骆驼路由消息到replyTo和错误队列的主要内容,如果未能解决你的问题,请参考以下文章

apache骆驼路由队列问题

骆驼路由输入端点的 JBoss 嵌入式 MQ 的 jndi 查找问题

当目的地不可用时如何停止使用骆驼路线中的消息

RabbitMQ,死信交换-> 无法将消息路由到默认交换?

Routing(路由模式)

将消息从 HTTP 端点发送到 JMS