如果通量为空,如何发出过滤掉的错误

Posted

技术标签:

【中文标题】如果通量为空,如何发出过滤掉的错误【英文标题】:How to emit filtered out error if flux is empty 【发布时间】:2022-01-20 13:34:41 【问题描述】:

我有同步代码,我想用 reactor 进行非阻塞。

我想并行调用不同的 URI,调用可以返回响应、错误或什么都没有。

有3种情况:

一个请求返回一个响应,我返回它而不等待其他请求完成。 如果其他请求更早返回错误,我会删除错误 至少有一个请求返回错误,其他请求没有返回响应,我返回错误 所有请求均未返回(无响应、无错误),我没有返回任何内容

我已经以同步的方式做到了:

    AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
    
    String responseBody = Flux.fromIterable(uriList)
            .flatMap(url -> repo.sendRequest(uri)) 
            // sendRequest  returns a Mono that either emit a response, an error or nothing
            .onErrorContinue(WebClientResponseException.class, (error, element) -> 
                var webclientError = (WebClientResponseException) error;
                responseException.set(webclientError);
            )
            .blockFirst();
    
    return Pair.of(responseBody, responseException.get());

我想移除阻塞调用并返回一个 Mono

据我了解,我有点保持发生错误的状态,而我不能有反应器的状态。

我如何跟踪发生的错误但不发出它们,因为我想查看其他请求稍后是否发出结果?

这个版本好用吗?

AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
    
    return Flux.fromIterable(uriList)
            .flatMap(url -> repo.sendRequest(uri)) 
            // sendRequest  returns a Mono that either emit a response, an error or nothing
            .onErrorContinue(WebClientResponseException.class, (error, element) -> 
                var webclientError = (WebClientResponseException) error;
                responseException.set(webclientError);
            )
            .next()
            .switchIfEmpty(Mono.defer(() -> Mono.error(responseException.get())));

AtomicReference 会像闭包一样被关闭吗?

【问题讨论】:

【参考方案1】:

我认为flatMapDelayError 可能会实现您想要的,请参见以下示例:

int concurrency = 10;
int prefetch = 1;

Flux.just(
        Mono.error(new IOException("error")).delaySubscription(Duration.ofSeconds(2)),
        Mono.just("fast").delaySubscription(Duration.ofSeconds(4)),
        Mono.just("slow").delaySubscription(Duration.ofSeconds(6)))
    .flatMapDelayError(
        request -> request,
        concurrency,
        prefetch)
    .next()
    .doOnNext(result -> System.out.println("Result: " + result))

在此示例中,error 首先完成,但 -DelayError 运算符持有它,然后 fast 完成并作为结果发出。最后slow 永远不会完成,因为.next() 取消了剩余的请求,因为我们有一个结果。

【讨论】:

谢谢,这似乎可以解决问题。我不知道这个运营商。所以文档不是很清楚,但如果发出多个错误,它似乎会发出CompositeException。如果我想检索原始错误之一,这是否正确:.onErrorMap(RuntimeException.class, e -&gt; Exceptions.unwrapMultiple(e).get(0)) 还有什么理由您选择 10 和 1 作为并发和预取而不是默认值? 是的,如果有多个错误,那么它们将被捆绑到 CompositeException 中,打开它来检索它们应该没问题。关于concurrency,我以10为例,但是应该根据需要并行处理多少请求来调整。关于prefetch,如果我理解正确的话,在这种情况下没关系,因为内部元素是Mono,所以无论如何只会有1个元素。

以上是关于如果通量为空,如何发出过滤掉的错误的主要内容,如果未能解决你的问题,请参考以下文章

如果字典元素为空,如何过滤字典数组?

如果输入为空,则发出警报[重复]

如果不为空,SQL 将在 Where 中包含条件

仅当搜索栏文本不为空时,如何运行过滤器功能?

如果过滤器变量为空,GraphQL 禁用过滤

仅当 Mono 为空时如何执行操作,如果不为空则抛出错误