Spring集成流程中的错误处理实践

Posted

技术标签:

【中文标题】Spring集成流程中的错误处理实践【英文标题】:Error handling practices in spring integration flow 【发布时间】:2016-07-15 02:50:15 【问题描述】:

我有一个涉及异步执行、从网关返回值到控制器、返回值后继续集成流的 Spring 集成流程。

这里是网关:

@MessagingGateway
public interface GW 

    @Gateway(requestChannel = "f.input")
    Task input(Collection<MessengerIncomingRequest> messages);


这是流程:

@Bean
IntegrationFlow jFlow() 
        return IntegrationFlows.from(
        MessageChannels.executor("f.input", executor()))
        .split()
        .channel(MessageChannels.executor(executor()))
        .transform(transformer)
        .channel(routerChannel())
        .get();


@Bean
ThreadPoolTaskExecutor executor() 
        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
        ...
        return pool;


@Bean
MessageChannel routerChannel() 
        return MessageChannels
        .publishSubscribe("routerChannel", executor())
        .get();


@Bean
IntegrationFlow routerChannelFlow() 
        return IntegrationFlows
        .from(routerChannel())
        .publishSubscribeChannel(s -> s
        .subscribe(f -> f.bridge(null))
        .subscribe(process()))
        .get();


@Bean
IntegrationFlow process() 
        return f ->
        f.route(p -> p.getKind().name(),
        m -> m.suffix("Channel")
        .channelMapping(TaskKind.CREATE.name(), "create")
        ....


@Bean
IntegrationFlow createFlow() 
        return IntegrationFlows.from(
        MessageChannels.direct("createChannel"))
        .handle(routerService)
        .get();

如何为整个流程定义错误处理程序?最佳实践是什么?我知道我可以为网关方法调用放置一个 try/catch 块,但它只会捕获 jFlow 流中出现的异常,用于 channel(routerChannel()) 之前的所有内容。

如何处理流程其余部分的错误?还是整个流程?

更新

我为publishSubscribeChannel 添加了错误处理程序

@Bean
IntegrationFlow routerChannelFlow() 
    return IntegrationFlows
            .from(routerChannel())
            .publishSubscribeChannel(s -> s
                    .subscribe(f -> f.bridge(null))
                    .subscribe(process())
                    .errorHandler(errorHandler))
            .get();

但这似乎没有帮助,因为如果出现异常,我会收到以下错误:

cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has already received a reply:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException:

我的错误处理程序没有被调用。

更新

根据 Gary 的回答,我尝试了以下代码:

@Bean
IntegrationFlow jFLow() 
    return IntegrationFlows.from(
            MessageChannels.executor("f.input", executor()))
            .split()
            .channel(MessageChannels.executor(executor()))
            .transform(transformer)
            .channel(routerChannel())
            .get();


@Bean
IntegrationFlow exceptionOrErrorFlow() 
    return IntegrationFlows.from(
            MessageChannels.direct("exceptionChannel"))
            .handle(errorHandler, "handleError")
            .get();


    @Bean
MessageChannel exceptionChannel() 
    return MessageChannels.direct("exceptionChannel")
            .get();


@Bean
IntegrationFlow process() 
        return f ->
        f.enrichHeaders((spec) ->
                    spec.header("errorChannel", "exceptionChannel", true))
        f.route(p -> p.getKind().name(),
        m -> m.suffix("Channel")
        .channelMapping(TaskKind.CREATE.name(), "create")
        ....


@MessagingGateway(errorChannel = "exceptionChannel")

在另一次编辑后,我将exceptionChannel 添加到网关,并将丰富的标题移动到我的流程的第二段(异步)。如果在流程的同步部分抛出异常,控制器仍然会被阻塞。

【问题讨论】:

【参考方案1】:

首先,让我解释一下网关的工作原理——它应该有助于理解下面的解决方案。

请求消息获得一个唯一的临时回复通道,该通道被添加为replyChannel 标头。即使网关有明确的replyChannel,它也只是桥接到请求的replyChannel - 这就是网关将回复与请求相关联的方式。

现在,网关还将请求的errorChannel 标头设置为相同的回复通道。这样,即使流是异步的,异常也可以路由回网关,然后抛出给调用者或路由到网关的错误通道(如果指定)。此路由由MessagePublishingErrorHandler 执行,该MessagePublishingErrorHandler 连接到包装您的执行程序的ErrorHandlingTaskExecutor

因为您正在向网关返回结果然后继续;该网关交互已“花费”,并且不会收到发送到 replyChannel 标头的消息(包括异常)。因此,您看到的日志消息。

因此,一种解决方案是修复发送到独立流的消息上的 errorChannel 标头。使用.enrichHeaders 替换(确保将overwrite 设置为true)由网关设置的errorChannel 标头。这应该在流程中尽快完成,以便将任何异常路由到该通道(然后您可以在那里订阅您的错误处理程序)。

另一种解决方案是连接您自己的错误处理执行程序,在其MessagePublishingErrorHandler 上显式设置defaultErrorChannel 并删除errorChannel 标头。

异步错误路由首先寻找header;如果存在,则将错误消息路由到那里;如果没有标头且 MPEH 没有默认错误通道;消息将被路由到默认的errorChannel,(通常)订阅了LoggingChannelAdapter。默认的errorChannel 是一个发布/订阅频道,因此您可以订阅其他端点。

编辑

您正在更改发布/订阅之前的频道。

获得至少一个对网关的响应很重要;您应该将错误通道单独留在 pub/sub 的一个分支上,并在第二个分支上更新它。这样,第一站的异常将被抛出给调用者(如果您想在网关处采取一些操作,例如路由到您的异常处理程序,您可以将errorChannel 添加到网关)。您必须只更新第二站的标头,以便其异常直接进入您的错误处理程序。

如果您将网关上的errorChannel 设置为您的exceptionChannel,那么两条腿上的异常都会出现。

【讨论】:

加里,感谢您的精彩解释!我想澄清一些事情1。 Use .enrichHeaders to replace (be sure to set overwrite to true) the errorChannel header that was set up by the gateway. - 我应该用什么频道名称替换? 2。 explicitly setting a defaultErrorChannel - 你能提示一下我应该设置什么 bean/class 这个 defaultErrorChannel 吗?到处都找不到。 我想我明白你的意思,我相应地更新了我的答案,仍然存在一些问题,但我想我看到了隧道尽头的曙光。 非常感谢,我也编辑了我的问题。当流程的同步部分抛出异常时,我仍然会阻止控制器。 您不能在第一站“消耗”异常 - 您需要将异常抛回给调用者。您要么需要不同的异常处理程序,要么添加一些逻辑。请参阅this gist(我认为)模拟您的应用程序。 您不得在 cmets 中针对旧答案提出新问题;它不能帮助人们找到问题/答案。提出一个更详细的新问题 - 例如。当您说“队列”时,您到底在谈论什么技术。

以上是关于Spring集成流程中的错误处理实践的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot提供RESTful接口时的错误处理实践

Pentaho 数据集成:错误处理

javaweb+spring 项目集成异常的处理

Spring AMQP 错误处理策略详解

python Python中的错误处理实践

Go中的错误和异常处理最佳实践