rxjs5合并和错误处理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rxjs5合并和错误处理相关的知识,希望对你有一定的参考价值。

我想组合/合并多个Observable,并在完成每个Observable时执行finally函数。 merge运算符似乎并行执行每个订阅,这是我需要的,但是如果它们中的任何一个抛出错误,则执行停止。

RxJS版本4有一个运算符mergeDelayError,它应该保持所有订阅执行直到所有订阅都完成,但是这个运算符在版本5中没有实现。

我应该回复到另一个运营商吗?

var source1 = Rx.Observable.of(1,2,3).delay(3000);
var source2 = Rx.Observable.throw(new Error('woops'));
var source3 = Rx.Observable.of(4,5,6).delay(1000);

// Combine the 3 sources into 1 

var source = Rx.Observable
  .merge(source1, source2, source3)
  .finally(() => {

    // finally is executed before all 
    // subscriptions are completed.

    console.log('finally');

  }); 

var subscription = source.subscribe(
  x => console.log('next:', x),
  e => console.log('error:', e),
  () => console.log('completed'));

JSBin

答案

我认为你可以使用catch()来模拟相同的行为。你只需要将它附加到每个源Observable:

const sources = [source1, source2, source3].map(obs => 
  obs.catch(() => Observable.empty())
);

Rx.Observable
  .merge(sources)
  .finally(...)
  ...
另一答案

如果您不想吞下错误,但希望将它们延迟到最后,您可以:

const mergeDelayErrors = [];
const sources = [source1, source2, source3].map(obs => obs.catch((error) => {
  mergeDelayErrors.push(error);
  return Rx.Observable.empty();
}));

return Rx.Observable
  .merge(...sources)
  .toArray()
  .flatMap(allEmissions => {
    let spreadObs = Rx.Observable.of(...allEmissions);
    if (mergeDelayErrors.length) {
      spreadObs = spreadObs.concat(Rx.Observable.throw(mergeDelayErrors));
    }
    return spreadObs;
  })

您可能只想抛出第一个错误,或者创建一个CompositeError。我不确定当抛出多个错误时mergeDelayErrors最初是如何表现的。

不幸的是,因为这个实现必须等到所有observable都在发出错误之前完成,所以它也会等到所有的observable都完成后再发出next。这可能不是mergeDelayError的原始行为,它应该作为流发出,而不是在最后发送它们。

另一答案

我们可以通过收集错误并在最后发出错误来避免阻塞流。

function mergeDelayError(...sources) {
  const errors = [];
  const catching = sources.map(obs => obs.catch(e => {
    errors.push(e);
    return Rx.Observable.empty();
  }));
  return Rx.Observable
    .merge(...catching)
    .concat(Rx.Observable.defer(
      () => errors.length === 0 ? Rx.Observable.empty() : Rx.Observable.throw(errors)));
}


const source1 = Rx.Observable.of(1,2,3);
const source2 = Rx.Observable.throw(new Error('woops'));
const source3 = Rx.Observable.of(4,5,6);

mergeDelayError(source1, source2, source3).subscribe(
  x => console.log('next:', x),
  e => console.log('error:', e),
  () => console.log('completed'));

以上是关于rxjs5合并和错误处理的主要内容,如果未能解决你的问题,请参考以下文章

4.3 合并重复的条件执行片段

使用 rxjs5 获取 BehaviorSubject 当前值的简单方法

rxjs5.X系列 —— transform api 笔记

Python代码阅读(第19篇):合并多个字典

学习小片段——springboot 错误处理

错误代码:错误域 = NSCocoaErrorDomain 代码 = 3840“JSON 文本没有以数组或对象和允许未设置片段的选项开头。”