如何处理 Flux.fromStream 中的异常抛出
Posted
技术标签:
【中文标题】如何处理 Flux.fromStream 中的异常抛出【英文标题】:How to handle exception throw in Flux.fromStream 【发布时间】:2021-12-22 14:16:20 【问题描述】: public Mono<ResponseEntity<Data>> getData(@RequestParam List<String> tagIds)
Flux<S3Object> s3ObjectFlux = Flux.fromStream(tagIds.stream())
.parallel()
.runOn(Schedulers.boundedElastic())
.flatMap(id -> fetchResources(id)) //S3Exception is thrown here
.flatMap(idS3Object -> Mono.just(s3Object))
.doOnError((throwable) -> log.error(throwable))
.ordered((u1, u2) -> u2.hashCode() - u1.hashCode());
Mono<Data> data = s3ObjectFlux.collectList()
.map(s3Objects -> new Data(s3Objects));
我正在遍历 tagIds 并在此处获取 s3 对象,如果该对象不存在或引发任何异常,我想记录并忽略它并继续下一步。但是在这种情况下,如果在迭代时从 fetchResource 方法中抛出 S3Exception,则会将错误作为 500 抛出给用户,而不是我想要空列表。
我没有看到其他选项,例如 onErrorMap 或 onErrorReturn
【问题讨论】:
我看到.flatMap(idS3Object -> Mono.just(s3Object))
不是必需的,如果省略,错误也会得到正确处理。我可以知道你为什么需要这一步吗?
【参考方案1】:
你会想要使用类似onErrorContinue
的东西。
另外,这个.ordered((u1, u2) -> u2.hashCode() - u1.hashCode())
并没有什么意义,您可以使用sequential
之类的东西来合并“rails”。
另外,我不太明白这个:.flatMap(idS3Object -> Mono.just(s3Object))
。首先,它不能编译,可能你有错字,你想要一些东西flatMap(idS3Object -> Mono.just(s3Object))
。在这种形式下,从一个对象创建一个Mono
并对其执行flatMap
完全没有意义。
把所有这些放在一起,我假设你想要这样的东西:
Flux<S3Object> s3ObjectFlux = Flux.fromStream(tagIds.stream())
.parallel()
.runOn(Schedulers.boundedElastic())
.flatMap(this::fetchResources)
.sequential()
.onErrorContinue((error, object) ->
log.error(error);
);
【讨论】:
谢谢@Ervin Szilagyi,但我不想只记录错误(我可以用doOnError来做,我想抛出不同的错误或只返回mono.empty以上是关于如何处理 Flux.fromStream 中的异常抛出的主要内容,如果未能解决你的问题,请参考以下文章