Spring-Integration Webflux 异常处理

Posted

技术标签:

【中文标题】Spring-Integration Webflux 异常处理【英文标题】:Spring-Integration Webflux exception handling 【发布时间】:2019-03-11 15:16:39 【问题描述】:

如果在 spring-integration webflux 流中发生异常,异常本身(带有堆栈跟踪)通过 MessagePublishingErrorHandler 作为有效负载发送回调用者,它使用来自“errorChannel”标头的错误通道,而不是默认的错误通道.

如何设置类似于WebExceptionHandler 的错误处理程序?我想生成一个 Http 状态代码,可能还有一个 DefaultErrorAttributes 对象作为响应。

仅仅定义一个从errorChannel 开始的流是行不通的,错误消息不会在那里结束。我试图定义我自己的fluxErrorChannel,但它似乎也没有用作错误通道,错误最终不会出现在我的errorFlow中:

@Bean
public IntegrationFlow fooRestFlow() 
    return IntegrationFlows.from(
            WebFlux.inboundGateway("/foo")
                    .requestMapping(r -> r.methods(HttpMethod.POST))
                    .requestPayloadType(Map.class)
                    .errorChannel(fluxErrorChannel()))
            .channel(bazFlow().getInputChannel())
            .get();


@Bean
public MessageChannel fluxErrorChannel() 
    return MessageChannels.flux().get();


@Bean
public IntegrationFlow errorFlow() 
    return IntegrationFlows.from(fluxErrorChannel())
            .transform(source -> source)
            .enrichHeaders(h -> h.header(HttpHeaders.STATUS_CODE, HttpStatus.BAD_GATEWAY))
            .get();


@Bean
public IntegrationFlow bazFlow() 
    return f -> f.split(Map.class, map -> map.get("items"))
            .channel(MessageChannels.flux())
            .<String>handle((p, h) -> throw new RuntimeException())
            .aggregate();

更新

MessagingGatewaySupport.doSendAndReceiveMessageReactive 中,我在 WebFlux.inboundGateway 上定义的错误通道从未用于设置错误通道,而错误通道始终是正在创建的 replyChannel here:

FutureReplyChannel replyChannel = new FutureReplyChannel();

Message<?> requestMessage = MutableMessageBuilder.fromMessage(message)
    .setReplyChannel(replyChannel)
    .setHeader(this.messagingTemplate.getSendTimeoutHeader(), null)
    .setHeader(this.messagingTemplate.getReceiveTimeoutHeader(), null)
    .setErrorChannel(replyChannel)
    .build();

错误通道最终被重置为Mono.fromFuture 中的originalErrorChannelHandler,但在我的情况下,该错误通道是ˋnullˋ。此外,永远不会调用 onErrorResume lambda:

return Mono.fromFuture(replyChannel.messageFuture)
    .doOnSubscribe(s -> 
        if (!error && this.countsEnabled) 
            this.messageCount.incrementAndGet();
        
    )
    .<Message<?>>map(replyMessage ->
        MessageBuilder.fromMessage(replyMessage)
            .setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader)
            .setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader)
            .build())
    .onErrorResume(t -> error ? Mono.error(t) : handleSendError(requestMessage, t));

这是如何工作的?

【问题讨论】:

相关部分参考:docs.spring.io/spring-integration/reference/html/… 在参考docs.spring.io/spring-integration/docs/current/reference/html/… WebFlux 错误的错误处理部分没有提到 - 它们与同步/异步不同吗? 【参考方案1】:

这是一个错误;错误处理程序为异常创建的ErrorMessage 被发送到errorChannel 标头(必须是replyChannel,以便网关获得结果)。然后网关应该调用错误流(如果存在)并返回结果。

https://jira.spring.io/browse/INT-4541

【讨论】:

太棒了 :o) 感谢您对此进行调查。 5.0.9 应该会在周一发布。

以上是关于Spring-Integration Webflux 异常处理的主要内容,如果未能解决你的问题,请参考以下文章

Spring-integration / ActiveMQ 在单个线程中订阅多个目的地

测试 Spring-Integration 与订阅者通道

在 spring-integration 中使用有效负载类型路由器通过列表通用有效负载路由消息

Spring-Integration Webflux 异常处理

使用spring-integration快速实现mysql分布锁

使用spring-integration快速实现mysql分布锁