处理 Reactor 中的平行通量

Posted

技术标签:

【中文标题】处理 Reactor 中的平行通量【英文标题】:Dealing with parallel flux in Reactor 【发布时间】:2019-12-23 09:02:25 【问题描述】:

我从可迭代对象中创建了平行通量。在每个迭代中,我都必须打一个休息电话。但是在执行时,即使任何请求失败,所有剩余的请求也会失败。无论成功与否,我都希望执行所有请求。

我目前正在使用 Flux.fromIterable 并使用 runOn 运算符

Flux.fromIterable(actions)
.parallel()
.runOn(Schedulars.elastic())
.flatMap(request -> someRemoteCall)     
.sequential()
.subscribe();

我希望执行 iterable 中的所有请求,无论失败或成功。但截至目前,有些被执行,有些失败。

【问题讨论】:

已添加顺序作为命中和试验的一部分。 这是我在日志中看到的。 "date":"2019-08-17T08:41:48.043+00:00","loglevel":"ERROR","logger_name":"reactor.core.publisher.Operators","thread_name":"reactor- http-client-epoll-11","message":"操作者调用默认 onErrorDropped","stack_trace":"org.springframework.web.reactive.function.client.WebClientResponseException: ClientResponse 有错误的状态码: 404 Not Found\n \tat 【参考方案1】:

我通常使用三种可能的方式来实现这一点:

使用flatMap() 的3 个参数版本,第二个是mapperOnError - 例如。 .flatMap(request -> someRemoteCall(), x->Mono.empty(), null); 使用onErrorResume(x -> Mono.empty()) 作为单独的调用来忽略任何错误; 使用.onErrorResume(MyException.class, x -> Mono.empty())) 忽略特定类型的错误。

第二个是我默认使用的,因为我觉得最清楚。

【讨论】:

【参考方案2】:

由于.parallel().runOn(...) 的使用,您不能使用onErrorContinue,如下所示:

.parallel()
.runOn(...)
.flatMap(request -> someRemoteCall)
.onErrorContinue(...)

但你也许可以像这样使用它:

.parallel().runOn(...)
.flatMap(request -> someRemoteCall
        .onErrorContinue((t, o) -> log.error("Skipped error: ", t.getMessage()))
)

假设someRemoteCallMonoFlux 不是 本身在.parallel().runOn(...) 上运行rails

但是当你没有someRemoteCall 时,你可以使用下面的技巧(参见NOT_MONO_AND_NOT_FLUX)来忽略.parallel().runOn(...) 上运行的不安全处理rails

Optional<List<String>> foundImageNames = 
    Flux.fromStream(this.fileStoreService.walk(path))
        .parallel(cpus, cpus)
        .runOn(Schedulers.newBoundedElastic(cpus, Integer.MAX_VALUE, "import"), 1)

        .flatMap(NOT_MONO_AND_NOT_FLUX -> Mono
            .just(NOT_MONO_AND_NOT_FLUX)
            .map(path -> sneak(() -> unsafeLocalSimpleProcessingReturningString(path)))
            .onErrorContinue(FileNotFoundException.class,
                (t, o) -> log.error("File missing:\n", t.getMessage()))
        )

        .collectSortedList(Comparator.naturalOrder())
        .blockOptional();

【讨论】:

【参考方案3】:

我仍在学习 WebFlux 和 Reactor,但在 flatMap(REST 调用)之后直接尝试 onErrorContinue 之一以删除(并可能记录)错误。

【讨论】:

是的,有几个 onError* 方法。这取决于用例/您要如何处理错误。 它不会在flatMap 之后立即起作用,因为parallel() 创建了一个没有onErrorContinueParallelFlux。它会在sequential() 创建一个Flux 之后立即工作。【参考方案4】:

Reactor 中存在延迟错误运算符。您可以按如下方式编写代码:

Flux.fromIterable(actions)
    .flatMapDelayError(request -> someRemoteCall(request).subscribeOn(Schedulers.elastic()), 256, 32)
    .doOnNext(System.out::println)
    .subscribe();

请注意,如果任何内部发布者发出错误,这仍然会使您的通量失败,但是,它会等待所有内部发布者完成后再执行此操作。

这些运算符还需要指定并发和预取参数。在示例中,我将它们设置为在常规 flatMap 调用中使用的默认值。

【讨论】:

这与parallel().runOn(...)并行处理方面非常不同。

以上是关于处理 Reactor 中的平行通量的主要内容,如果未能解决你的问题,请参考以下文章

Project Reactor:仅在未发出第一项时通量超时

Project Reactor 3 中的 publishOn 与 subscribeOn

muduo源代码分析--Reactor模式在muduo中的使用

Reactor和Proactor对比

Reactor事件模型在Redis中的应用

反应式编程Reactor中的多线程