执行 Flux.map() 时如何处理错误
Posted
技术标签:
【中文标题】执行 Flux.map() 时如何处理错误【英文标题】:How to handle error while executing Flux.map() 【发布时间】:2016-07-14 05:31:21 【问题描述】:我试图弄清楚在 Flux 中映射元素时如何处理错误。
例如,我将一个 CSV 字符串解析为我的一个业务 POJO:
myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));
其中一些行可能包含错误,所以我在日志中得到的是:
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
reactor.core.publisher.FluxLog: onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
reactor.core.publisher.FluxLog: java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo
我在 API 中阅读了一些错误处理方法,但大多数是指返回“错误值”或使用后备 Flux,例如:
Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);
但是,将它与我的myflux
一起使用意味着再次处理整个助焊剂。
那么,有没有办法在处理特定元素时处理错误(即忽略它们/记录它们)并继续处理其余的通量?
使用@akarnokd 解决方法更新
public Flux<StockQuotation> getQuotes(List<String> tickers)
Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
// Get each set of quotes in a separate thread
.flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
// Convert each list of raw quotes string in a new Flux<String>
.flatMap(list -> Flux.fromIterable(list))
// Convert the string to POJOs
.flatMap(x ->
try
return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));
catch (IllegalArgumentException ex)
System.out.println("Error decoding stock quotation: " + x);
return Flux.empty();
);
return processingFlux;
不过,这很有魅力,因为您可以看到代码没有以前那么优雅了。 Flux API 没有任何方法可以执行此代码的操作吗?
retry(...)
retryWhen(...)
onErrorResumeWith(...)
onErrorReturn(...)
【问题讨论】:
您可以使用自定义异常,该异常可以将失败的元素作为变量包含在其中。然后在onError方法中,可以通过自定义异常中的getter方法获取失败的元素。 【参考方案1】:...
// Convert the string to POJOs
.flatMap(x ->
Flux.just(converter.convertHistoricalCSVToStockQuotation(x))
.doOnError(IllegalArgumentException.class,
e -> System.out.println("Error decoding stock quotation: " + x))
//.onErrorStop()
.onErrorResume(IllegalArgumentException.class, e -> Flux.empty())
)
...
【讨论】:
使用just
意味着计算不会在Mono/Flux“内部”完成,所以错误不会被它处理。【参考方案2】:
您可以使用 onErrorContinue。 它允许通过删除故障元素并继续后续元素来从错误中恢复。
【讨论】:
【参考方案3】:如果你想使用 Reactor 3 的方法来处理异常,你可以使用Mono.fromCallable
。
flatMap(x ->
Mono.fromCallable(() -> converter.convertHistoricalCSVToStockQuotation(x))
.flux()
.flatMap(Flux::fromIterable)
.onErrorResume(Flux::empty)
)
不幸的是没有Flux.fromCallable
,所以假设callable返回一个列表,你必须手动将其转换为Flux。
【讨论】:
【参考方案4】:在当前版本的 Reactor 3 中,添加了很多方法。所以我们可以这样做:
Flux.onErrorResume(error ->
System.out.println("Error decoding stock quotation: " + e);
return Flux.empty();
);
查看更多关于如何处理错误的信息here
【讨论】:
【参考方案5】:您需要flatMap
,如果处理失败,您可以返回一个空序列:
myflux.flatMap(v ->
try
return Flux.just(converter.convertHistoricalCSVToStockQuotation(stock));
catch (IllegalArgumentException ex)
return Flux.empty();
);
【讨论】:
效果很好(会接受这个答案),但我想知道这是否可以通过 API 完成。如果没有,我将打开一个功能请求。谢谢! 这是执行此类行为的事实上的标准 API。错误是终止事件,您必须将它们转换为 lambdas 中的其他内容以避免终止。 好的。我提议创建一种新方法来处理个别故障(也许将这些故障发布为“死信”通量?)。也许这会有所帮助...以上是关于执行 Flux.map() 时如何处理错误的主要内容,如果未能解决你的问题,请参考以下文章
在 Sequelize 中使用 .create(...) 方法时如何处理错误