当 Spring Cloud Stream 反应式使用者遇到异常时,为啥我会收到 onComplete 信号?
Posted
技术标签:
【中文标题】当 Spring Cloud Stream 反应式使用者遇到异常时,为啥我会收到 onComplete 信号?【英文标题】:Why am I getting onComplete signal when an exception is encountered in Spring Cloud Stream reactive consumer?当 Spring Cloud Stream 反应式使用者遇到异常时,为什么我会收到 onComplete 信号? 【发布时间】:2021-01-17 15:27:18 【问题描述】:我将 Spring Reactor 与 Spring Cloud Stream (GCP Pub/Sub Binder) 结合使用,并遇到了错误处理问题。我可以通过一个非常简单的示例重现该问题:
@Bean
public Function<Flux<String>, Mono<Void>> consumer()
return flux -> flux
.doOnNext(msg -> log.info("New message received: ", msg))
.map(msg ->
if (true)
throw new RuntimeException("exception encountered!");
return msg;
)
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
我期望的行为是看到“无法使用消息”打印,但是,这似乎不是发生的情况。当向链添加.log()
调用时,我看到onNext
/onComplete
信号,我希望看到onError
信号。
我的实际代码如下所示:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService)
return flux -> flux
.doOnNext(msg -> log.info("New message received: ", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
我注意到在我的服务类的深处,我试图对我的 Reactor 发布者进行错误处理。但是,onError
信号在使用 Spring Cloud Stream 时不会出现。如果我在单元测试中简单地调用我的服务 myService.processMessage(msg)
并模拟异常,我的反应链将正确传播错误信号。
当我连接到 Spring Cloud Stream 时,这似乎是一个问题。我想知道 Spring Cloud Function/Stream 是否正在执行任何全局错误包装?
在我的非平凡代码中,我确实注意到此错误消息可能与我没有收到错误信号的原因有关?
ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...
为了进一步混淆,如果我将 Spring Cloud Stream 绑定切换到非反应式实现,我可以在反应链中获得 onError
信号:
@Bean
public Consumer<CustomMessage> consumer(MyService myService)
return customMessage -> Mono.just(customMessage)
.doOnNext(msg -> log.info("New message received: ", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable)) // prints successfully this time
.subscribe();
【问题讨论】:
【参考方案1】:所以这是我从自己的调查中收集到的,也许这可能对其他人有所帮助。预先警告,我可能没有使用正确的“Spring Reactor 语言”,但这就是我最终解决它的方式......
在Hoxton.SR5
中,一个管理通量订阅的onErrorContinue
was included on the reactive binding。 onErrorContinue
的问题在于,它通过在失败的运算符(如果支持)上应用 BiConsumer 函数来影响 上游 运算符。
这意味着当我们的 map
/flatMap
操作符发生错误时,onErrorContinue
BiConsumer 将启动并将下游信号修改为 onComplete()
(Mono<T>
) 或 request(...)
(如果它从Flux<T>
请求了一个新元素)。这导致我们的doOnError(...)
运算符没有执行,因为没有onError()
信号。
最终,SCS 团队决定remove this error handling wrapper。 Hoxton.SR6
不再有这个onErrorContinue
。但是,这意味着传播到 SCS 绑定的异常将导致 Flux 订阅被切断。由于没有订阅者,后续消息将无处可路由。
此错误处理已传递给客户端,我们将onErrorResume
运算符添加到内部发布者 以有效地丢弃错误信号。当myService::processMessage
发布者中遇到错误时,onErrorResume
会将发布者切换到作为参数传入的后备发布者,并从操作员链中的该点恢复。在我们的例子中,这个后备发布者只是返回 Mono.empty()
,这允许我们丢弃错误信号,同时仍然允许内部错误处理机制运行,同时也不影响外部源发布者。
onErrorResume
示例/说明
上面的技术可以用一个非常简单的例子来说明。
Flux.just(1, 2, 3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error")
: Mono.just(i))
.onErrorResume(t -> Flux.just(4, 5, 6))
.doOnNext(i -> log.info("Element: ", i))
.subscribe();
上面的Flux<Integer>
会输出如下:
Element: 1
Element: 4
Element: 5
Element: 6
由于在元素 2
处遇到错误,onErrorResume
回退启动,新发布者变为Flux.just(4, 5, 6)
,有效地恢复回退。在我们的例子中,我们不想影响源发布者(即Flux.just(1, 2, 3)
)。我们只想删除错误的元素 (2
) 并继续下一个元素 (3
)。
我们不能简单地将Flux.just(4, 5, 6)
更改为Flux.empty()
或Mono.empty()
:
Flux.just(1, 2, 3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error")
: Mono.just(i))
.onErrorResume(t -> Mono.empty())
.doOnNext(i -> log.info("Element: ", i))
.subscribe();
这将导致以下输出:
Element: 1
这是因为onErrorResume
已将上游发布者替换为后备发布者(即Mono.empty()
)并从那时起恢复。
为了实现我们想要的输出:
Element: 1
Element: 3
我们必须将onErrorResume
运算符放在flatMap
的内部发布者上:
public Mono<Integer> func(int i)
return i = 2 ? Mono.error(new RuntimeException("error")) : Mono.just(i);
Flux.just(1, 2, 3)
.flatMap(i -> func(i)
onErrorResume(t -> Mono.empty()))
.doOnNext(i -> log.info("Element: ", i))
.subscribe();
现在,onErrorResume
只影响func(i)
返回的内部发布者。如果 func(i)
中的操作员发生错误,onErrorResume
将回退到 Mono.empty()
有效地完成 Mono<T>
而不会炸毁。这仍然允许在回退运行之前应用错误处理运算符(例如 doOnError
)within func(i)
。这是因为,与onErrorContinue
不同的是,它不会影响上游操作符,也不会改变错误位置的下一个信号。
最终解决方案
在我的问题中重用 code-sn-p,我已将 Spring Cloud 版本升级到 Hoxton.SR6
并将代码更改为如下内容:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService)
return flux -> flux
.doOnNext(msg -> log.info("New message received: ", msg))
.flatMap(msg -> myService.processMessage(msg)
.onErrorResume(throwable -> Mono.empty())
)
.then();
请注意,onErrorResume
位于内部发布者(flatMap
内部)。
【讨论】:
这真的很烦人,我试图找到解决方案并最终得到与你相同的结果,但在这种情况下,你不能有重试和 DLQ 的情况,你需要对每个 flatMap 重复这些步骤你有你的管道 @navid_gh 是的,我同意。幸运的是,对于我的特定用例,我们已经有一个EmitterProcessor
连接到 Spring Cloud Stream Supplier<T>
。所以我们最终只是创建了一个带有retryCount
字段的自定义消息并手动进行重试。显然并不理想,但我们似乎必须对这种反应式方法做出很多权衡。
但是如何解决 spring sleuth 的问题,EmitterProcessor 无法很好地跟踪,每次调用它都会创建一个新的跟踪器 ID,并且不使用链中现有的跟踪器 ID跨度>
【参考方案2】:
我认为问题存在于以下代码中:
.map(msg -> new RuntimeException("exception encountered!"))
地图行中的 lambda 返回异常,而不是抛出异常。
【讨论】:
抱歉,这是我的错字,我已经更新了我的问题。它也应该包括throw
。问题依然存在: .map(msg -> throw new RuntimeException("exception遇到!"))以上是关于当 Spring Cloud Stream 反应式使用者遇到异常时,为啥我会收到 onComplete 信号?的主要内容,如果未能解决你的问题,请参考以下文章