什么是使用 RxJS 在包含数组的 Observable 上迭代(并应用一些逻辑)的优雅解决方案?

Posted

技术标签:

【中文标题】什么是使用 RxJS 在包含数组的 Observable 上迭代(并应用一些逻辑)的优雅解决方案?【英文标题】:What is an elegand solution to iterate (and apply some logic) on an Observable containing an array using RxJS? 【发布时间】:2021-03-08 20:47:56 【问题描述】:

我不太喜欢 RxJS,我对为什么它是解决这个问题的最佳方法有以下疑问。我将尝试详细解释它。进入服务类我有这两种方法:

  acceptArtistBid(bid): void 

    this.findArtistBidsAppliedByCurrentWall(bid)
    .subscribe(artistsBisdList => 
      console.log("ARTISTS BIDS LIST RELATED THE CURRENT WALL: ", artistsBisdList);

      // ITERATE ON EACH ELEMENT OF THE artistsBisdList, DO SOME CHANGE TO EACH ELEMENT AND UPDATE ALL THESE ELEEMENTS ON FIRESTORE
      
    ); 
  

  findArtistBidsAppliedByCurrentWall(bid):Observable<Bid[]> 
    return this.db.collection('bids',
    ref=> ref.where("wallId", "==", bid.wallId))
    .snapshotChanges()
    .pipe(
        map(snaps => 

            const courses = this.convertSnaps<Bid>(snaps);

            return courses;
        )
    )
  

所以基本上我有这个 acceptArtistBid() 方法调用 findArtistBidsAppliedByCurrentWall() 以检索包含 Observable出价对象数组。

目前在我的 acceptArtistBid() 方法中,我订阅了这个 observable 以检索对象列表......然后我必须迭代这个列表以便应用一些逻辑这些对象中的每一个(我只需更改状态字段的值),然后在 Firestore 数据库中更新这些对象。

目前我正在考虑使用这种方法进行简单的经典 for 循环交互,或者可能是 forEach() 方法将迭代器函数作为参数传递(如下所示:https://www.w3schools.com/js/js_array_iteration.asp)为了更改数组中每个对象的状态字段值,然后在 Firebase 上一一更新这些对象。

我在问这种方法是否正确,或者是否可以使用 RxJS 做得更好......在这种情况下,我认为我不必订阅我的 findArtistBidsAppliedByCurrentWall(bid) 方法,但是也许我必须直接在 Observable 上使用一些 RxJS 运算符。

有可能吗?使用 RxJS 来解决这个问题有什么优雅的解决方案?

【问题讨论】:

【参考方案1】:

我的答案是:使用简单的forEach。您可以通过 Observable&lt;Bid[]&gt; 通过 map 管道并在那里转换您的对象来做同样的事情,但我认为您不会转换这些对象,而是将它们传递给方法或类似方法。为此:为什么要使用可观察结构?

另一方面,如果您要操作该 observable、过滤、删除、添加、映射等,您将使用管道。

【讨论】:

【参考方案2】:

如果我理解你的问题,我认为正确的答案取决于这个用例的要求。

首先,您必须考虑从 Firestore 读取写入 到 Firestore 是 2 个异步流。特别是对于编写,流可以在每次元素更新时发出,或者它可以是一个批处理更新,当整批元素更新时只发出一次。

也就是说,您需要了解这两个流是否相互独立。更具体地说,如果您可以返回读取结果并同时更新 Firestore,或者如果您需要确保在将更新结果返回给客户端之前更新 Firestore。

然后您需要了解如何管理错误。更新错误的处理方式是否与读取错误不同?在其余的答案中,我不会涵盖这部分,但它不能被忽略。

假设您处于必须先更新 Firestore 中的所有元素才能将结果返回给客户端的情况。在这种情况下,我会按照这些思路使用和处理。

首先在服务中创建一个方法,该方法返回一个 Observable,该方法仅在每个元素更新后才会发出从 Firestore 读取的出价列表。

acceptArtistBid(bid) 
  // define a variable to hold the list of bids
  let listOfBids: Array<Bid>;
  // notice that the Observable is returned by this method and not subscribed in the service
  return this.findArtistBidsAppliedByCurrentWall(bid).pipe(
    // here you use the map of Observable to transform the list, and within this map you call the map Array method to run the transformation on each bid
    map(artistsBisdList => 
       listOfBids = artistsBisdList.map(bid => // transform bid);
       return listOfBids
    ),
    // assume updateFirestore$ is a method that returns an Observable representing the update operation - this second map operator returns an array of Observables
    map(updatedList => updatedList.map(updatedBid => updateFirestore$(bid))),
    // forkJoin is an RxJs function that accepts an array of Observables and emits when all Observables have completed - in this case it emits when all elements have been updated on Firestore
    concatMap(listOfObservables => forkJoin(listOfObservables)),
    // this last map is just to make the method return listOfBids
    map(() => listOfBids)
  )

然后我会让服务的客户端决定何时何地订阅 acceptArtistBid 方法返回的 Observable。

上述代码示例未经测试,因此可能包含一些错误,但我希望它能够传达逻辑意义。还请考虑可以删除 listOfBids 变量,但这可能会使代码的可读性降低,至少根据我的口味。

一般来说,认为服务方法返回 Observables 而不是订阅它们是一个很好的规则。订阅它们应该是客户的责任。这是构建应用程序的反应式方法,首先定义您感兴趣的流,通常连接简单的流以创建更复杂的流†,然后订阅它们。

使用这种方法,您最终会得到很少(通常只有一个)代表应用程序整个逻辑的流,因此即使不是只有一个,订阅也很少。

【讨论】:

以上是关于什么是使用 RxJS 在包含数组的 Observable 上迭代(并应用一些逻辑)的优雅解决方案?的主要内容,如果未能解决你的问题,请参考以下文章

RxJS Observables 的 Promise.all 行为?

rxjs5合并和错误处理

Rxjs:使用 scan 或 mergeMap 或任何 rxjs 在 X 秒后将 observables 数据流(grpc 服务)组合成一个数组

什么是 tslint 黑名单,为什么 angular-cli 默认 rxjs 在 tslint.json 的列表中?

Angular/RxJS:带有可观察对象的嵌套服务调用

如何在没有嵌套订阅的情况下使用 rxjs 处理内部订阅数组