如何处理 Flux.fromStream 中的异常抛出

Posted

技术标签:

【中文标题】如何处理 Flux.fromStream 中的异常抛出【英文标题】:How to handle exception throw in Flux.fromStream 【发布时间】:2021-12-22 14:16:20 【问题描述】:
    public Mono<ResponseEntity<Data>> getData(@RequestParam List<String> tagIds)
   
   Flux<S3Object> s3ObjectFlux = Flux.fromStream(tagIds.stream())
           .parallel()
           .runOn(Schedulers.boundedElastic())
           .flatMap(id -> fetchResources(id)) //S3Exception is thrown here 
           .flatMap(idS3Object -> Mono.just(s3Object))
           .doOnError((throwable) -> log.error(throwable))
           .ordered((u1, u2) -> u2.hashCode() - u1.hashCode());

   Mono<Data> data = s3ObjectFlux.collectList()
           .map(s3Objects -> new Data(s3Objects));  

我正在遍历 tagIds 并在此处获取 s3 对象,如果该对象不存在或引发任何异常,我想记录并忽略它并继续下一步。但是在这种情况下,如果在迭代时从 fetchResource 方法中抛出 S3Exception,则会将错误作为 500 抛出给用户,而不是我想要空列表。

我没有看到其他选项,例如 onErrorMap 或 onErrorReturn

【问题讨论】:

我看到.flatMap(idS3Object -&gt; Mono.just(s3Object)) 不是必需的,如果省略,错误也会得到正确处理。我可以知道你为什么需要这一步吗? 【参考方案1】:

你会想要使用类似onErrorContinue的东西。

另外,这个.ordered((u1, u2) -&gt; u2.hashCode() - u1.hashCode()) 并没有什么意义,您可以使用sequential 之类的东西来合并“rails”。

另外,我不太明白这个:.flatMap(idS3Object -&gt; Mono.just(s3Object))。首先,它不能编译,可能你有错字,你想要一些东西flatMap(idS3Object -&gt; Mono.just(s3Object))。在这种形式下,从一个对象创建一个Mono 并对其执行flatMap 完全没有意义。

把所有这些放在一起,我假设你想要这样的东西:

Flux<S3Object> s3ObjectFlux = Flux.fromStream(tagIds.stream())
        .parallel()
        .runOn(Schedulers.boundedElastic())
        .flatMap(this::fetchResources)
        .sequential()
        .onErrorContinue((error, object) -> 
            log.error(error);
        );

【讨论】:

谢谢@Ervin Szilagyi,但我不想只记录错误(我可以用doOnError来做,我想抛出不同的错误或只返回mono.empty

以上是关于如何处理 Flux.fromStream 中的异常抛出的主要内容,如果未能解决你的问题,请参考以下文章

我们如何处理 bigquery 过程中的异常?

如何处理 lambda 中的已检查异常? [复制]

如何处理 NEAR 跨合约调用中的异常?

如何处理 lambda 表达式中的异常 [重复]

如何处理更改密码中的异常?

如何处理异步函数 UWP App GetFileFromPathAsync(path) 中的异常;