处理 Reactor 中的平行通量
Posted
技术标签:
【中文标题】处理 Reactor 中的平行通量【英文标题】:Dealing with parallel flux in Reactor 【发布时间】:2019-12-23 09:02:25 【问题描述】:我从可迭代对象中创建了平行通量。在每个迭代中,我都必须打一个休息电话。但是在执行时,即使任何请求失败,所有剩余的请求也会失败。无论成功与否,我都希望执行所有请求。
我目前正在使用 Flux.fromIterable 并使用 runOn 运算符
Flux.fromIterable(actions)
.parallel()
.runOn(Schedulars.elastic())
.flatMap(request -> someRemoteCall)
.sequential()
.subscribe();
我希望执行 iterable 中的所有请求,无论失败或成功。但截至目前,有些被执行,有些失败。
【问题讨论】:
已添加顺序作为命中和试验的一部分。 这是我在日志中看到的。 "date":"2019-08-17T08:41:48.043+00:00","loglevel":"ERROR","logger_name":"reactor.core.publisher.Operators","thread_name":"reactor- http-client-epoll-11","message":"操作者调用默认 onErrorDropped","stack_trace":"org.springframework.web.reactive.function.client.WebClientResponseException: ClientResponse 有错误的状态码: 404 Not Found\n \tat 【参考方案1】:我通常使用三种可能的方式来实现这一点:
使用flatMap()
的3 个参数版本,第二个是mapperOnError
- 例如。 .flatMap(request -> someRemoteCall(), x->Mono.empty(), null)
;
使用onErrorResume(x -> Mono.empty())
作为单独的调用来忽略任何错误;
使用.onErrorResume(MyException.class, x -> Mono.empty()))
忽略特定类型的错误。
第二个是我默认使用的,因为我觉得最清楚。
【讨论】:
【参考方案2】:由于.parallel().runOn(...)
的使用,您不能使用onErrorContinue
,如下所示:
.parallel()
.runOn(...)
.flatMap(request -> someRemoteCall)
.onErrorContinue(...)
但你也许可以像这样使用它:
.parallel().runOn(...)
.flatMap(request -> someRemoteCall
.onErrorContinue((t, o) -> log.error("Skipped error: ", t.getMessage()))
)
假设someRemoteCall
是Mono
或Flux
不是 本身在.parallel().runOn(...)
上运行rails。
但是当你没有someRemoteCall
时,你可以使用下面的技巧(参见NOT_MONO_AND_NOT_FLUX
)来忽略.parallel().runOn(...)
上运行的不安全处理rails:
Optional<List<String>> foundImageNames =
Flux.fromStream(this.fileStoreService.walk(path))
.parallel(cpus, cpus)
.runOn(Schedulers.newBoundedElastic(cpus, Integer.MAX_VALUE, "import"), 1)
.flatMap(NOT_MONO_AND_NOT_FLUX -> Mono
.just(NOT_MONO_AND_NOT_FLUX)
.map(path -> sneak(() -> unsafeLocalSimpleProcessingReturningString(path)))
.onErrorContinue(FileNotFoundException.class,
(t, o) -> log.error("File missing:\n", t.getMessage()))
)
.collectSortedList(Comparator.naturalOrder())
.blockOptional();
【讨论】:
【参考方案3】:我仍在学习 WebFlux 和 Reactor,但在 flatMap
(REST 调用)之后直接尝试 onErrorContinue
之一以删除(并可能记录)错误。
【讨论】:
是的,有几个 onError* 方法。这取决于用例/您要如何处理错误。 它不会在flatMap
之后立即起作用,因为parallel()
创建了一个没有onErrorContinue
的ParallelFlux
。它会在sequential()
创建一个Flux
之后立即工作。【参考方案4】:
Reactor 中存在延迟错误运算符。您可以按如下方式编写代码:
Flux.fromIterable(actions)
.flatMapDelayError(request -> someRemoteCall(request).subscribeOn(Schedulers.elastic()), 256, 32)
.doOnNext(System.out::println)
.subscribe();
请注意,如果任何内部发布者发出错误,这仍然会使您的通量失败,但是,它会等待所有内部发布者完成后再执行此操作。
这些运算符还需要指定并发和预取参数。在示例中,我将它们设置为在常规 flatMap 调用中使用的默认值。
【讨论】:
这与parallel().runOn(...)
在并行处理方面非常不同。以上是关于处理 Reactor 中的平行通量的主要内容,如果未能解决你的问题,请参考以下文章
Project Reactor 3 中的 publishOn 与 subscribeOn