RXJS combineLatest running pipe 7 次一次发射

Posted

技术标签:

【中文标题】RXJS combineLatest running pipe 7 次一次发射【英文标题】:RXJS combineLatest running pipe 7 times for one emit 【发布时间】:2021-10-29 08:33:50 【问题描述】:

我编写了一个函数来组合两个 firestore 集合 observables,它工作得很好,但是它运行 map 函数 7 次只得到一个结果。我在这里做错了什么,还是有办法减少正在完成的处理量?

return combineLatest(
    [
      this.approvedToppings$,
      this.myPendingToppings$
    ],
  ).pipe(
    map(([approved, pending], index) => 
      console.log(index);
      //console.log(pending);
      let toppingDocs = approved.concat(pending);
      //console.log(toppingDocs);
      let toppings: Topping[] = [];
      toppingDocs.forEach((toppingDoc) => 
        toppings.push(this.convertToppingDocToTopping(toppingDoc));
      );
      return toppings as Topping[];
    )
  );

地图顶部的 console.log 每次更新都会运行 7 次,这对我来说没有任何意义。 订阅每个输入 observable 会显示一个结果,订阅输出 observable 也会显示一个组合输出。

【问题讨论】:

能否把approvedToppings$ & myPendingToppings$的代码附上? 它们都是 angularfire firestore collection().valueChanges(); 那么您在该日志语句中看到了 7 次相同的索引? @MrkSef 正确。在两个 observables 至少发出一次之后,索引 0 * 7 倍,并且每次它们中的任何一个都在之后更新。 在从 combineLatest observable 获得结果后,您似乎正在向 firestore 集合推送新的更改,这会触发 valueChanges 再次发出新值。尝试检查你对这个 combineLatest observable 所做的逻辑,你可能会发现任何错误。 【参考方案1】:

如果你想让 combineLatest 只工作一次,你可以尝试使用 take(1),distinctUntilChanged(),像这样;

return combineLatest(
    [
      this.approvedToppings$,
      this.myPendingToppings$
    ],
  ).pipe(
    take(1), // Add this line &&
    distinctUntilChanged(), // Add this line too
    map(([approved, pending], index) => 
      console.log(index);
      //console.log(pending);
      let toppingDocs = approved.concat(pending);
      //console.log(toppingDocs);
      let toppings: Topping[] = [];
      toppingDocs.forEach((toppingDoc) => 
        toppings.push(this.convertToppingDocToTopping(toppingDoc));
      );
      return toppings as Topping[];
    )
  );

【讨论】:

感谢您的回答,虽然它确实限制了日志触发的次数,但它实际上并没有检测到 myPendingToppings 的更新,因此更新那里的值无法反映在最终的 Observable 中。 那不是我朋友的原始问题。您最初的问题是限制控制台打印的次数。同样在您的代码中,不清楚您在哪里更新待处理的 observable。在新帖子或此处插入更多代码以获得更多答案。【参考方案2】:

Combine latest 仅在它具有所有依赖项的值时才发出。然后当它的任何依赖再次发出时,它再次发出每个的最新值。

如果这些依赖项只发出一次,那么我认为您在控制台中可能有 7 个日志的原因是因为您有多个订阅者到此 observable。

Observables 默认是冷的。这意味着如果您对该可观察对象开放了许多订阅,则该可观察对象将被创建多次。如果你使用 shareReplay 会发生什么?:

return combineLatest(
    [
      this.approvedToppings$,
      this.myPendingToppings$
    ],
  ).pipe(
    ...,
    shareReplay(1)
  );

如果您只想从每个依赖项中获取第一个值,请在每个依赖项上应用 take(1)

【讨论】:

以上是关于RXJS combineLatest running pipe 7 次一次发射的主要内容,如果未能解决你的问题,请参考以下文章

ionic - RXJS 错误:rxjs_Observable__.Observable.combineLatest 不是函数

[RxJS] Combining Streams with CombineLatest

使用 combineLatest 和 Rxjs 返回 observable 的结果

使用 takeUntil 和 combineLatest 取消订阅可观察的 rxjs 不起作用

如何为combineLatest rxjs Angular编写Jasmine Unit测试用例

如何提供一个Rxjs observable作为使用Jasmine中的combineLatest的方法的数据