如果通量为空,如何发出过滤掉的错误
Posted
技术标签:
【中文标题】如果通量为空,如何发出过滤掉的错误【英文标题】:How to emit filtered out error if flux is empty 【发布时间】:2022-01-20 13:34:41 【问题描述】:我有同步代码,我想用 reactor 进行非阻塞。
我想并行调用不同的 URI,调用可以返回响应、错误或什么都没有。
有3种情况:
一个请求返回一个响应,我返回它而不等待其他请求完成。 如果其他请求更早返回错误,我会删除错误 至少有一个请求返回错误,其他请求没有返回响应,我返回错误 所有请求均未返回(无响应、无错误),我没有返回任何内容我已经以同步的方式做到了:
AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
String responseBody = Flux.fromIterable(uriList)
.flatMap(url -> repo.sendRequest(uri))
// sendRequest returns a Mono that either emit a response, an error or nothing
.onErrorContinue(WebClientResponseException.class, (error, element) ->
var webclientError = (WebClientResponseException) error;
responseException.set(webclientError);
)
.blockFirst();
return Pair.of(responseBody, responseException.get());
我想移除阻塞调用并返回一个 Mono
据我了解,我有点保持发生错误的状态,而我不能有反应器的状态。
我如何跟踪发生的错误但不发出它们,因为我想查看其他请求稍后是否发出结果?
这个版本好用吗?
AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
return Flux.fromIterable(uriList)
.flatMap(url -> repo.sendRequest(uri))
// sendRequest returns a Mono that either emit a response, an error or nothing
.onErrorContinue(WebClientResponseException.class, (error, element) ->
var webclientError = (WebClientResponseException) error;
responseException.set(webclientError);
)
.next()
.switchIfEmpty(Mono.defer(() -> Mono.error(responseException.get())));
AtomicReference 会像闭包一样被关闭吗?
【问题讨论】:
【参考方案1】:我认为flatMapDelayError
可能会实现您想要的,请参见以下示例:
int concurrency = 10;
int prefetch = 1;
Flux.just(
Mono.error(new IOException("error")).delaySubscription(Duration.ofSeconds(2)),
Mono.just("fast").delaySubscription(Duration.ofSeconds(4)),
Mono.just("slow").delaySubscription(Duration.ofSeconds(6)))
.flatMapDelayError(
request -> request,
concurrency,
prefetch)
.next()
.doOnNext(result -> System.out.println("Result: " + result))
在此示例中,error
首先完成,但 -DelayError
运算符持有它,然后 fast
完成并作为结果发出。最后slow
永远不会完成,因为.next()
取消了剩余的请求,因为我们有一个结果。
【讨论】:
谢谢,这似乎可以解决问题。我不知道这个运营商。所以文档不是很清楚,但如果发出多个错误,它似乎会发出CompositeException
。如果我想检索原始错误之一,这是否正确:.onErrorMap(RuntimeException.class, e -> Exceptions.unwrapMultiple(e).get(0))
还有什么理由您选择 10 和 1 作为并发和预取而不是默认值?
是的,如果有多个错误,那么它们将被捆绑到 CompositeException
中,打开它来检索它们应该没问题。关于concurrency
,我以10
为例,但是应该根据需要并行处理多少请求来调整。关于prefetch
,如果我理解正确的话,在这种情况下没关系,因为内部元素是Mono
,所以无论如何只会有1个元素。以上是关于如果通量为空,如何发出过滤掉的错误的主要内容,如果未能解决你的问题,请参考以下文章