Rxjs:Observable.combineLatest vs Observable.forkJoin

Posted

技术标签:

【中文标题】Rxjs:Observable.combineLatest vs Observable.forkJoin【英文标题】:Rxjs: Observable.combineLatest vs Observable.forkJoin 【发布时间】:2017-06-07 10:29:07 【问题描述】:

我想知道Observable.combineLatestObservable.forkJoin 之间有什么区别?

据我所知,唯一的区别是 forkJoin 期望 Observables 完成,而 combineLatest 返回最新值。

【问题讨论】:

注意:在 rxjs6+ 中,这些现在只是创建 observable 的 combineLatest()forkJoin() 函数。它们的作用相同,但语法不同。不要将combineLatestrxjs/operators 混淆,后者是一个“可管道”运算符。如果你导入错误的,你会得到错误。 【参考方案1】:

combineLatest(...)

并行运行可观察对象,每次发出一个值一个可观察对象发出一个值所有可观察对象都发出至少一个值

forkJoin(...)

并行运行可观察对象,并在所有可观察对象完成发出单个值

错误处理注意事项:

如果有任何 observables 错误输出 - 使用 combineLatest 它将发出直到错误被抛出的点。 forkJoin 只会在任何 observables 出错时返回一个错误。


高级说明:CombineLatest 不只是为每个来源获取一个值并移动到下一个。如果您需要确保只获得每个源 observable 的“下一个可用项目”,您可以在将源 observable 添加到输入数组时将 .pipe(take(1)) 添加到源 observable。

【讨论】:

感谢您考虑错误处理。在多个 observables 的情况下,错误处理很难理解。 我相信您误解了 combineLatest() 代码中的 concat() 的作用。在我看来,它像 Array.prototype.concat 方法,而不是 RxJS concat 方法。假设我是对的,那么这个答案具有误导性和不正确性,因为combineLatest() 不会按顺序逐个执行可观察对象。它并行执行它们,与forkJoin() 相同。不同之处在于产生了多少值以及源 observables 是否必须完成。 @GregL Array.concat 对可观察对象毫无意义。我绝对依赖 concat 的顺序特性——要理解的关键是在它们全部执行之前你不会得到任何输出值。 我相信combineLatest()forkJoin() 都是并行运行的。运行下面的代码为两者输出大约 5000。 const start = new Date().getTime();combineLatest([of(null).pipe(delay(5000)), of(null).pipe(delay(5000)), of(null).pipe(delay(5000))]).subscribe(() => console.log(new Date().getTime() - start));forkJoin([of(null).pipe(delay(5000)), of(null).pipe(delay(5000)), of(null).pipe(delay(5000))]).subscribe(() => console.log(new Date().getTime() - start)); 所以现在最后,combineLatest 是并行工作的吗? id 那么答案是误导性的,需要更新答案或标记它【参考方案2】:

forkJoin - 当所有 observables 完成时,发出每个 observables 最后发出的值。

combineLatest - 当任何 observable 发出一个值时,发出每个 observable 的最新值。

用法非常相似,但您不应该忘记取消订阅 combineLatest,这与 forkJoin 不同。

【讨论】:

如果一个请求失败,所有嵌套请求都会自动取消,我该如何解决这个问题? @SunilGarg 如果您只想获得所有结果,您应该使用 forkJoin 或 combineLatest。如果您不关心所有结果,则应单独使用订阅。 你能回答这个***.com/questions/55558277/… 你能用代码示例解释为什么需要取消订阅【参考方案3】:

forkJoin 不仅要求完成所有输入 observable,而且它还返回一个 observable,它产生一个值,该值是输入 observable 产生的最后一个值的数组。换句话说,它一直等到最后一个输入 observable 完成,然后产生一个值并完成。

相比之下,combineLatest 返回一个 Observable,每次输入的 observable 都产生一个新值,一旦所有输入的 observable 都产生了至少一个值。这意味着它可能具有无限值并且可能无法完成。这也意味着输入的 observables 在产生值之前不必完成。

【讨论】:

@GregL 有没有类似 forkJoin 但也适用于失败的 http 调用的函数? @Tukkan,我会将一个从错误中恢复的运算符链接到每个 http observable,以便您定义用于每个错误请求的值,而不是寻找一个将组合多个可能出错的可观察对象(我不确定是否存在这样的运算符)。从错误中恢复的运算符包括.catch().onErrorResumeNext(),可能还有.retry()(如果 Http 调用可能会间歇性失败)。 澄清一下,它们都产生数组。 @Simon_Weaver 不一定。在 combineLatest() 的情况下,您可以提供一个投影函数来指定如何从输入 observables 的最新值生成输出值。默认情况下,如果您不指定一个,您将获得一个包含最新发出的值的数组。 在 RxJS7 中,您可以将对象传递给 combineLatest,然后您无需担心在管道中混淆订单。例如。 combineLatest( one: of(1), two: of(2)).subscribe(( one, two ) => console.log(one + two); ) 将输出 3。从版本 6 开始,forkJoin 上存在相同的功能。这比使用数组更好。注意:如果你碰巧在使用 RXJS,它还没有出现在 Angular 12 中。

以上是关于Rxjs:Observable.combineLatest vs Observable.forkJoin的主要内容,如果未能解决你的问题,请参考以下文章

RxJS 6有哪些新变化?

如何在不需要 rxjs-compat 的情况下只导入 RxJS 6 中使用的运算符,如旧的 RxJS?

RxJS入门2之Rxjs的安装

[RxJS] What RxJS operators are

[RxJS] Error Handling in RxJS

javascript RXJS - 使用rxjs.js实现指数退避