Apache camel 错误处理如何与多播和事务一起使用

Posted

技术标签:

【中文标题】Apache camel 错误处理如何与多播和事务一起使用【英文标题】:How Apache camel error handling works with multicast and transactions 【发布时间】:2021-02-16 02:37:00 【问题描述】:

我得到了带有骆驼(3.5.0)应用程序的spring boot(2.3.2.RELEASE)和两条测试路线:

这个想法是检查当从多播调用的子路由中发生异常时会发生什么,以及它如何处理事务。

@Component
public class MyRoute1 extends RouteBuilder 

@Override
public void configure() 

    onException(Exception.class)
        .useOriginalBody()
        .log("Error handler parent. Body is: $body")
        .process(new Processor() 
            @Override
            public void process(Exchange exchange) throws Exception 
                Exception cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
                System.out.println(cause.getMessage());
                cause.printStackTrace(System.out);
            
        )
        .handled(true);

    from("jms:queue:EventsQueue")
        .routeId("route1")
        .onCompletion().log("On complete parent").end()
        .transacted()
        .multicast(AggregationStrategies.useOriginal(), false)
            .to("direct:route2")
            .log("Second step")
        .end()
        .log("Third step");



@Component
public class MyRoute2 extends RouteBuilder 

@Override
public void configure() 

    onException(IOException.class)
        .log("Error handling child");

    from("direct:route2")
        .routeId("route2")
        .onCompletion().log("On complete child").end()
        .log("First step")
        .throwException(new IOException("Very bad exception"));




当我向jms:queue:EventsQueue 发送消息时,我预计交换将失败,并引发我的异常并遵循自定义日志消息的顺序:

First step
Error handling child
On complete child
On complete parent

不应触发其他日志消息。

但我有以下行为:java.lang.NullPointerException 被多播聚合策略抛出并被父 onException 块捕获。所以自定义日志的顺序是:

First step
Error handling child
On complete child
Error handler parent
null
java.lang.NullPointerException
    at org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy.aggregate(UseOriginalAggregationStrategy.java:62)
...
On complete parent

有趣的部分来了:如果我在父路由行为更改中删除 .transacted() 方法。 java.lang.NullPointerException 异常仍然是通过聚合策略抛出的,但它并没有在我的父母 onException 块中捕获。

骆驼日志.transacted()

2020-11-03 14:38:35.800  INFO 11744 --- [           main] a.test.errors.MySpringBootApplication    : Started MySpringBootApplication in 3.44 seconds (JVM running for 3.916)
2020-11-03 14:38:46.389  INFO 11744 --- [[EventsPsQueue]] route2                                   : First step
2020-11-03 14:38:46.394  INFO 11744 --- [[EventsPsQueue]] route2                                   : Error handling child
2020-11-03 14:38:46.398 ERROR 11744 --- [[EventsPsQueue]] o.a.c.p.e.DefaultErrorHandler            : Failed delivery for (MessageId: queue_EventsPsQueue_ID_wsc-111-71a-44245-1603872340334-4_9_1_1_3 on ExchangeId: ID-wsc-111-71a-1604403526383-0-2). Exhausted after delivery attempt: 1 caught: java.io.IOException: Very bad exception. Processed by failure processor: FatalFallbackErrorHandler[Channel[log5]]

Message History (complete message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route1            ] [route1            ] [from[jms://queue:EventsPsQueue]                                               ] [        25]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------


java.io.IOException: Very bad exception
    at activemq.test.errors.chapter1.MyRoute2.configure(MyRoute2.java:20) ~[classes/:na]
    at org.apache.camel.builder.RouteBuilder.checkInitialized(RouteBuilder.java:483) ~[camel-core-engine-3.5.0.jar:3.5.0]
    at org.apache.camel.builder.RouteBuilder.configureRoutes(RouteBuilder.java:430) ~[camel-core-engine-3.5.0.jar:3.5.0]
    at org.apache.camel.builder.RouteBuilder.addRoutesToCamelContext(RouteBuilder.java:405) ~[camel-core-engine-3.5.0.jar:3.5.0]
    at org.apache.camel.impl.engine.AbstractCamelContext.addRoutes(AbstractCamelContext.java:1185) ~[camel-base-3.5.0.jar:3.5.0]
    at org.apache.camel.main.RoutesConfigurer.configureRoutes(RoutesConfigurer.java:93) ~[camel-main-3.5.0.jar:3.5.0]
    at org.apache.camel.spring.boot.CamelSpringBootApplicationListener.onApplicationEvent(CamelSpringBootApplicationListener.java:101) ~[camel-spring-boot-3.5.0.jar:3.5.0]
    at org.apache.camel.spring.boot.CamelSpringBootApplicationListener.onApplicationEvent(CamelSpringBootApplicationListener.java:57) ~[camel-spring-boot-3.5.0.jar:3.5.0]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:404) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:361) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:898) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
    at activemq.test.errors.MySpringBootApplication.main(MySpringBootApplication.java:13) ~[classes/:na]

2020-11-03 14:38:46.400  WARN 11744 --- [[EventsPsQueue]] o.a.c.s.spi.TransactionErrorHandler      : Transaction rollback (0x347b370c) redelivered(false) for (MessageId: queue_EventsPsQueue_ID_wsc-111-71a-44245-1603872340334-4_9_1_1_3 on ExchangeId: ID-wsc-111-71a-1604403526383-0-2) caught: java.io.IOException: Very bad exception
2020-11-03 14:38:46.401  INFO 11744 --- [[EventsPsQueue]] route2                                   : On completion child
2020-11-03 14:38:46.401  INFO 11744 --- [[EventsPsQueue]] route1                                   : Error handler parent. Body is: TEST MESSAGE
null
java.lang.NullPointerException
    at org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy.aggregate(UseOriginalAggregationStrategy.java:62)
    at org.apache.camel.AggregationStrategy.aggregate(AggregationStrategy.java:86)
    at org.apache.camel.processor.MulticastProcessor.doAggregateInternal(MulticastProcessor.java:628)
    at org.apache.camel.processor.MulticastProcessor.doAggregateSync(MulticastProcessor.java:609)
    at org.apache.camel.processor.MulticastProcessor.doAggregate(MulticastProcessor.java:594)
    at org.apache.camel.processor.MulticastProcessor$MulticastTask.aggregate(MulticastProcessor.java:413)
    at org.apache.camel.processor.MulticastProcessor$MulticastTask.lambda$null$0(MulticastProcessor.java:393)
    at org.apache.camel.AsyncCallback.run(AsyncCallback.java:44)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.schedule(DefaultReactiveExecutor.java:55)
    at org.apache.camel.processor.CamelInternalProcessor$AsyncAfterTask.done(CamelInternalProcessor.java:186)
    at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:129)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:287)
    at org.apache.camel.processor.MulticastProcessor$MulticastTask.lambda$run$1(MulticastProcessor.java:367)
    at org.apache.camel.util.concurrent.AsyncCompletionService$Task.run(AsyncCompletionService.java:150)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleSync(DefaultReactiveExecutor.java:65)
    at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:273)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:723)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:632)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleSync(DefaultReactiveExecutor.java:65)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:181)
    at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:164)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:261)
    at org.apache.camel.processor.Pipeline$PipelineTask.run(Pipeline.java:90)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleSync(DefaultReactiveExecutor.java:65)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:145)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:723)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:632)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleSync(DefaultReactiveExecutor.java:65)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:181)
    at org.apache.camel.spring.spi.TransactionErrorHandler.access$201(TransactionErrorHandler.java:45)
    at org.apache.camel.spring.spi.TransactionErrorHandler$2.process(TransactionErrorHandler.java:238)
    at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
    at org.apache.camel.spring.spi.TransactionErrorHandler.processByErrorHandler(TransactionErrorHandler.java:235)
    at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:110)
    at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:123)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:723)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:632)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleSync(DefaultReactiveExecutor.java:65)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:181)
    at org.apache.camel.spring.spi.TransactionErrorHandler.access$201(TransactionErrorHandler.java:45)
    at org.apache.camel.spring.spi.TransactionErrorHandler$2.process(TransactionErrorHandler.java:238)
    at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
    at org.apache.camel.spring.spi.TransactionErrorHandler.processByErrorHandler(TransactionErrorHandler.java:235)
    at org.apache.camel.spring.spi.TransactionErrorHandler$1.doInTransactionWithoutResult(TransactionErrorHandler.java:198)
    at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
    at org.apache.camel.spring.spi.TransactionErrorHandler.doInTransactionTemplate(TransactionErrorHandler.java:191)
    at org.apache.camel.spring.spi.TransactionErrorHandler.processInTransaction(TransactionErrorHandler.java:146)
    at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:114)
    at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:123)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:287)
    at org.apache.camel.processor.Pipeline$PipelineTask.run(Pipeline.java:90)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:147)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:287)
    at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
    at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41)
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:130)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
2020-11-03 14:38:46.407  INFO 11744 --- [[EventsPsQueue]] route1                                   : On complete parent
2020-11-03 14:38:46.411  WARN 11744 --- [[EventsPsQueue]] c.c.j.DefaultJmsMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'EventsPsQueue' - trying to recover. Cause: Transaction rolled back because it has been marked as rollback-only

没有事务支持的骆驼日志:

2020-11-03 14:40:27.748  INFO 11981 --- [           main] a.test.errors.MySpringBootApplication    : Started MySpringBootApplication in 3.234 seconds (JVM running for 3.643)
2020-11-03 14:40:36.828  INFO 11981 --- [[EventsPsQueue]] route2                                   : First step
2020-11-03 14:40:36.832  INFO 11981 --- [[EventsPsQueue]] route2                                   : Error handling child
2020-11-03 14:40:36.837 ERROR 11981 --- [[EventsPsQueue]] o.a.c.p.e.DefaultErrorHandler            : Failed delivery for (MessageId: queue_EventsPsQueue_ID_wsc-111-71a-44245-1603872340334-4_9_1_1_4 on ExchangeId: ID-wsc-111-71a-1604403636824-0-2). Exhausted after delivery attempt: 1 caught: java.io.IOException: Very bad exception. Processed by failure processor: FatalFallbackErrorHandler[Channel[log5]]

Message History (complete message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route1            ] [route1            ] [from[jms://queue:EventsPsQueue]                                               ] [        24]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------


java.io.IOException: Very bad exception
    at activemq.test.errors.chapter1.MyRoute2.configure(MyRoute2.java:20) ~[classes/:na]
    at org.apache.camel.builder.RouteBuilder.checkInitialized(RouteBuilder.java:483) ~[camel-core-engine-3.5.0.jar:3.5.0]
    at org.apache.camel.builder.RouteBuilder.configureRoutes(RouteBuilder.java:430) ~[camel-core-engine-3.5.0.jar:3.5.0]
    at org.apache.camel.builder.RouteBuilder.addRoutesToCamelContext(RouteBuilder.java:405) ~[camel-core-engine-3.5.0.jar:3.5.0]
    at org.apache.camel.impl.engine.AbstractCamelContext.addRoutes(AbstractCamelContext.java:1185) ~[camel-base-3.5.0.jar:3.5.0]
    at org.apache.camel.main.RoutesConfigurer.configureRoutes(RoutesConfigurer.java:93) ~[camel-main-3.5.0.jar:3.5.0]
    at org.apache.camel.spring.boot.CamelSpringBootApplicationListener.onApplicationEvent(CamelSpringBootApplicationListener.java:101) ~[camel-spring-boot-3.5.0.jar:3.5.0]
    at org.apache.camel.spring.boot.CamelSpringBootApplicationListener.onApplicationEvent(CamelSpringBootApplicationListener.java:57) ~[camel-spring-boot-3.5.0.jar:3.5.0]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:404) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:361) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:898) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
    at activemq.test.errors.MySpringBootApplication.main(MySpringBootApplication.java:13) ~[classes/:na]

2020-11-03 14:40:36.838  INFO 11981 --- [[EventsPsQueue]] route2                                   : On completion child
2020-11-03 14:40:36.839  INFO 11981 --- [[EventsPsQueue]] route1                                   : On complete parent
2020-11-03 14:40:36.848  WARN 11981 --- [[EventsPsQueue]] o.a.c.c.jms.EndpointMessageListener      : Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - java.lang.NullPointerException]

org.apache.camel.RuntimeCamelException: java.lang.NullPointerException
    at org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException(RuntimeCamelException.java:51) ~[camel-api-3.5.0.jar:3.5.0]
    at org.apache.camel.component.jms.EndpointMessageListener$EndpointMessageListenerAsyncCallback.done(EndpointMessageListener.java:213) ~[camel-jms-3.5.0.jar:3.5.0]
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:134) ~[camel-jms-3.5.0.jar:3.5.0]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736) ~[spring-jms-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696) ~[spring-jms-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) ~[spring-jms-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245) ~[spring-jms-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189) ~[spring-jms-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179) ~[spring-jms-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076) ~[spring-jms-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.lang.NullPointerException: null
    at org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy.aggregate(UseOriginalAggregationStrategy.java:62) ~[camel-base-3.5.0.jar:3.5.0]
    at org.apache.camel.AggregationStrategy.aggregate(AggregationStrategy.java:86) ~[camel-api-3.5.0.jar:3.5.0]
    at org.apache.camel.processor.MulticastProcessor.doAggregateInternal(MulticastProcessor.java:628) ~[camel-base-3.5.0.jar:3.5.0]
    at org.apache.camel.processor.MulticastProcessor.doAggregateSync(MulticastProcessor.java:609) ~[camel-base-3.5.0.jar:3.5.0]
    at org.apache.camel.processor.MulticastProcessor.doAggregate(MulticastProcessor.java:594) ~[camel-base-3.5.0.jar:3.5.0]
    at org.apache.camel.processor.MulticastProcessor$MulticastTask.aggregate(MulticastProcessor.java:413) ~[camel-base-3.5.0.jar:3.5.0]
    at org.apache.camel.processor.MulticastProcessor$MulticastTask.lambda$null$0(MulticastProcessor.java:393) ~[camel-base-3.5.0.jar:3.5.0]
    at org.apache.camel.AsyncCallback.run(AsyncCallback.java:44) ~[camel-api-3.5.0.jar:3.5.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148) ~[camel-base-3.5.0.jar:3.5.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60) ~[camel-base-3.5.0.jar:3.5.0]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:147) ~[camel-base-3.5.0.jar:3.5.0]
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:287) ~[camel-base-3.5.0.jar:3.5.0]
    at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[camel-base-3.5.0.jar:3.5.0]
    at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41) ~[camel-support-3.5.0.jar:3.5.0]
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:130) ~[camel-jms-3.5.0.jar:3.5.0]
    ... 11 common frames omitted

问题是:

如果子路由中发生其他异常,为什么在多播期间我有java.lang.NullPointerException。 为什么在事务支持和没有事务支持的情况下我会有不同的错误处理行为。

我会很高兴得到任何帮助或建议!

【问题讨论】:

哇赞... 1 小时内获得 8 个赞... 是的,这是一个关键时刻,对团队来说非常重要。 所以团队中没有人设置断点并进行调试以查看实际情况?这将是一个相当容易的步骤。您是否查看了有关多播的所有选项的文档?尤其是关于例外的那一项? 可以使用“stopOnException”多播选项避免父路由中的异常。问题的想法是更好地理解错误处理过程:为什么错误处理在事务处理和公共路由上有所不同,在我看来,它应该是相同的。 【参考方案1】:

对于 onException,您可以将另外两件事设置为 truefalse,它们是 handledcontinued。我认为这些向您解释了异常如何传播到父路由。

如果handled为真,那么抛出的异常会被处理,Camel不会在原路由中继续路由,而是跳出

https://camel.apache.org/manual/latest/exception-clause.html#ExceptionClause-WhatistheDifferenceBetweenHandledandContinued

在 Camel 2.3 中,我们引入了一个新选项 continue,它允许您在原始路由中处理和继续路由,就好像没有发生异常一样。

https://camel.apache.org/manual/latest/exception-clause.html#ExceptionClause-HandleandContinueExceptions

关于transacted() 我不知道。我正在阅读它,如果我找到答案会将您链接到那个。

【讨论】:

【参考方案2】:

这是预期的行为。

让我先引用 Camel 官方文档。

非事务性和事务性错误处理之间的区别。

Camel 中的错误处理大致可以分为两种不同的类型:

非事务性 事务性

非事务性是最常见的类型,开箱即用并由 Camel 自己处理。 事务类型由 J2EE 应用服务器等后备系统处理。

然后,这就是 Camel 默认处理错误的方式。

在 Camel 2.0 及以后的版本中,全局 DefaultErrorHandler 默认设置为错误处理程序。配置为:

没有重新投递 无死信队列

如果交换失败,则会引发异常并将其传播回包装在 RuntimeCamelException 中的原始调用方。

最后,这就是多播 EIP 改变这种行为的方式。

默认情况下,多播 EIP 将继续处理整个交换,即使其中一个多播消息在路由期间会抛出异常。

当在您的非事务性路由中引发异常时,只有DefaultErrorHandler 会记录错误,但路由会继续执行,因为我们在多播 EIP 中。路由完成,没有异常传播回来。

当您的事务路由中抛出异常时,TransactionErrorHandler 也会启动并停止路由执行以触发后备系统中的回滚。此时,异常由 Camel 包装并传播回来,您的 onException 逻辑在此被调用。

【讨论】:

以上是关于Apache camel 错误处理如何与多播和事务一起使用的主要内容,如果未能解决你的问题,请参考以下文章

Apache Camel中的窃听和多播有啥区别

Apache Camel:我是否需要使我的FTP-Consumer路由事务处理?

多播和广播推送通知

ZeroMQ/NanoMsg 发布/订阅与多播

(chap4 IP协议) 多播和子网掩码

网络通信中组播和多播的联系,区别分别是啥?