当 Spring Cloud Stream 反应式使用者遇到异常时,为啥我会收到 onComplete 信号?

Posted

技术标签:

【中文标题】当 Spring Cloud Stream 反应式使用者遇到异常时,为啥我会收到 onComplete 信号?【英文标题】:Why am I getting onComplete signal when an exception is encountered in Spring Cloud Stream reactive consumer?当 Spring Cloud Stream 反应式使用者遇到异常时,为什么我会收到 onComplete 信号? 【发布时间】:2021-01-17 15:27:18 【问题描述】:

我将 Spring Reactor 与 Spring Cloud Stream (GCP Pub/Sub Binder) 结合使用,并遇到了错误处理问题。我可以通过一个非常简单的示例重现该问题:

@Bean
public Function<Flux<String>, Mono<Void>> consumer() 
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: ", msg))
        .map(msg -> 
            if (true)  
                throw new RuntimeException("exception encountered!");
            
            return msg;
        )
        .doOnError(throwable -> log.error("Failed to consume message", throwable))
        .then();

我期望的行为是看到“无法使用消息”打印,但是,这似乎不是发生的情况。当向链添加.log() 调用时,我看到onNext/onComplete 信号,我希望看到onError 信号。

我的实际代码如下所示:

@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) 
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: ", msg))
        .flatMap(myService::processMessage) // exception happens deep in here
        .doOnError(throwable -> log.error("Failed to consume message", throwable))
        .then();

我注意到在我的服务类的深处,我试图对我的 Reactor 发布者进行错误处理。但是,onError 信号在使用 Spring Cloud Stream 时不会出现。如果我在单元测试中简单地调用我的服务 myService.processMessage(msg) 并模拟异常,我的反应链将正确传播错误信号。

当我连接到 Spring Cloud Stream 时,这似乎是一个问题。我想知道 Spring Cloud Function/Stream 是否正在执行任何全局错误包装?

在我的非平凡代码中,我确实注意到此错误消息可能与我没有收到错误信号的原因有关?

ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...

为了进一步混淆,如果我将 Spring Cloud Stream 绑定切换到非反应式实现,我可以在反应链中获得 onError 信号:

@Bean
public Consumer<CustomMessage> consumer(MyService myService) 
    return customMessage -> Mono.just(customMessage)
        .doOnNext(msg -> log.info("New message received: ", msg))
        .flatMap(myService::processMessage) // exception happens deep in here
        .doOnError(throwable -> log.error("Failed to consume message", throwable)) // prints successfully this time
        .subscribe();

【问题讨论】:

【参考方案1】:

所以这是我从自己的调查中收集到的,也许这可能对其他人有所帮助。预先警告,我可能没有使用正确的“Spring Reactor 语言”,但这就是我最终解决它的方式......

Hoxton.SR5 中,一个管理通量订阅的onErrorContinue was included on the reactive binding。 onErrorContinue 的问题在于,它通过在失败的运算符(如果支持)上应用 BiConsumer 函数来影响 上游 运算符。

这意味着当我们的 map/flatMap 操作符发生错误时,onErrorContinue BiConsumer 将启动并将下游信号修改为 onComplete() (Mono&lt;T&gt;) 或 request(...)(如果它从Flux&lt;T&gt; 请求了一个新元素)。这导致我们的doOnError(...) 运算符没有执行,因为没有onError() 信号。

最终,SCS 团队决定remove this error handling wrapper。 Hoxton.SR6 不再有这个onErrorContinue。但是,这意味着传播到 SCS 绑定的异常将导致 Flux 订阅被切断。由于没有订阅者,后续消息将无处可路由。

此错误处理已传递给客户端,我们将onErrorResume 运算符添加到内部发布者 以有效地丢弃错误信号。当myService::processMessage 发布者中遇到错误时,onErrorResume 会将发布者切换到作为参数传入的后备发布者,并从操作员链中的该点恢复。在我们的例子中,这个后备发布者只是返回 Mono.empty(),这允许我们丢弃错误信号,同时仍然允许内部错误处理机制运行,同时也不影响外部源发布者。

onErrorResume 示例/说明

上面的技术可以用一个非常简单的例子来说明。

Flux.just(1, 2, 3)
    .flatMap(i -> i == 2
        ? Mono.error(new RuntimeException("error")
        : Mono.just(i))
    .onErrorResume(t -> Flux.just(4, 5, 6))
    .doOnNext(i -> log.info("Element: ", i))
    .subscribe();

上面的Flux&lt;Integer&gt;会输出如下:

Element: 1
Element: 4
Element: 5
Element: 6

由于在元素 2 处遇到错误,onErrorResume 回退启动,新发布者变为Flux.just(4, 5, 6),有效地恢复回退。在我们的例子中,我们不想影响源发布者(即Flux.just(1, 2, 3))。我们只想删除错误的元素 (2) 并继续下一个元素 (3)。

我们不能简单地将Flux.just(4, 5, 6) 更改为Flux.empty()Mono.empty()

Flux.just(1, 2, 3)
    .flatMap(i -> i == 2
        ? Mono.error(new RuntimeException("error")
        : Mono.just(i))
    .onErrorResume(t -> Mono.empty())
    .doOnNext(i -> log.info("Element: ", i))
    .subscribe();

这将导致以下输出:

Element: 1

这是因为onErrorResume 已将上游发布者替换为后备发布者(即Mono.empty())并从那时起恢复

为了实现我们想要的输出:

Element: 1
Element: 3

我们必须将onErrorResume 运算符放在flatMap 的内部发布者上:

public Mono<Integer> func(int i) 
    return i = 2 ? Mono.error(new RuntimeException("error")) : Mono.just(i);


Flux.just(1, 2, 3)
    .flatMap(i -> func(i)
        onErrorResume(t -> Mono.empty()))
    .doOnNext(i -> log.info("Element: ", i))
    .subscribe();

现在,onErrorResume 只影响func(i) 返回的内部发布者。如果 func(i) 中的操作员发生错误,onErrorResume 将回退到 Mono.empty() 有效地完成 Mono&lt;T&gt; 而不会炸毁。这仍然允许在回退运行之前应用错误处理运算符(例如 doOnErrorwithin func(i)。这是因为,与onErrorContinue不同的是,它不会影响上游操作符,也不会改变错误位置的下一个信号。

最终解决方案

在我的问题中重用 code-sn-p,我已将 Spring Cloud 版本升级到 Hoxton.SR6 并将代码更改为如下内容:

@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) 
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: ", msg))
        .flatMap(msg -> myService.processMessage(msg)
            .onErrorResume(throwable -> Mono.empty())
        )
        .then();

请注意,onErrorResume 位于内部发布者(flatMap 内部)。

【讨论】:

这真的很烦人,我试图找到解决方案并最终得到与你相同的结果,但在这种情况下,你不能有重试和 DLQ 的情况,你需要对每个 flatMap 重复这些步骤你有你的管道 @navid_gh 是的,我同意。幸运的是,对于我的特定用例,我们已经有一个 EmitterProcessor 连接到 Spring Cloud Stream Supplier&lt;T&gt;。所以我们最终只是创建了一个带有retryCount 字段的自定义消息并手动进行重试。显然并不理想,但我们似乎必须对这种反应式方法做出很多权衡。 但是如何解决 spring sleuth 的问题,EmitterProcessor 无法很好地跟踪,每次调用它都会创建一个新的跟踪器 ID,并且不使用链中现有的跟踪器 ID跨度> 【参考方案2】:

我认为问题存在于以下代码中:

    .map(msg -> new RuntimeException("exception encountered!"))

地图行中的 lambda 返回异常,而不是抛出异常。

【讨论】:

抱歉,这是我的错字,我已经更新了我的问题。它也应该包括throw。问题依然存在: .map(msg -> throw new RuntimeException("exception遇到!"))

以上是关于当 Spring Cloud Stream 反应式使用者遇到异常时,为啥我会收到 onComplete 信号?的主要内容,如果未能解决你的问题,请参考以下文章

当没有运行 kafka 代理时,如何禁用 Spring Cloud Stream 以用于开发目的?

Spring Cloud Stream

Spring Cloud Stream的分区和分组

Spring cloud stream消息分区

Spring Cloud Stream教程消费群体

微服务之路spring cloud stream