什么是使用 RxJS 在包含数组的 Observable 上迭代(并应用一些逻辑)的优雅解决方案?
Posted
技术标签:
【中文标题】什么是使用 RxJS 在包含数组的 Observable 上迭代(并应用一些逻辑)的优雅解决方案?【英文标题】:What is an elegand solution to iterate (and apply some logic) on an Observable containing an array using RxJS? 【发布时间】:2021-03-08 20:47:56 【问题描述】:我不太喜欢 RxJS,我对为什么它是解决这个问题的最佳方法有以下疑问。我将尝试详细解释它。进入服务类我有这两种方法:
acceptArtistBid(bid): void
this.findArtistBidsAppliedByCurrentWall(bid)
.subscribe(artistsBisdList =>
console.log("ARTISTS BIDS LIST RELATED THE CURRENT WALL: ", artistsBisdList);
// ITERATE ON EACH ELEMENT OF THE artistsBisdList, DO SOME CHANGE TO EACH ELEMENT AND UPDATE ALL THESE ELEEMENTS ON FIRESTORE
);
findArtistBidsAppliedByCurrentWall(bid):Observable<Bid[]>
return this.db.collection('bids',
ref=> ref.where("wallId", "==", bid.wallId))
.snapshotChanges()
.pipe(
map(snaps =>
const courses = this.convertSnaps<Bid>(snaps);
return courses;
)
)
所以基本上我有这个 acceptArtistBid() 方法调用 findArtistBidsAppliedByCurrentWall() 以检索包含 的 Observable出价对象数组。
目前在我的 acceptArtistBid() 方法中,我订阅了这个 observable 以检索对象列表......然后我必须迭代这个列表以便应用一些逻辑这些对象中的每一个(我只需更改状态字段的值),然后在 Firestore 数据库中更新这些对象。
目前我正在考虑使用这种方法进行简单的经典 for 循环交互,或者可能是 forEach() 方法将迭代器函数作为参数传递(如下所示:https://www.w3schools.com/js/js_array_iteration.asp)为了更改数组中每个对象的状态字段值,然后在 Firebase 上一一更新这些对象。
我在问这种方法是否正确,或者是否可以使用 RxJS 做得更好......在这种情况下,我认为我不必订阅我的 findArtistBidsAppliedByCurrentWall(bid) 方法,但是也许我必须直接在 Observable 上使用一些 RxJS 运算符。
有可能吗?使用 RxJS 来解决这个问题有什么优雅的解决方案?
【问题讨论】:
【参考方案1】:我的答案是:使用简单的forEach
。您可以通过 Observable<Bid[]>
通过 map
管道并在那里转换您的对象来做同样的事情,但我认为您不会转换这些对象,而是将它们传递给方法或类似方法。为此:为什么要使用可观察结构?
另一方面,如果您要操作该 observable、过滤、删除、添加、映射等,您将使用管道。
【讨论】:
【参考方案2】:如果我理解你的问题,我认为正确的答案取决于这个用例的要求。
首先,您必须考虑从 Firestore 读取 和 写入 到 Firestore 是 2 个异步流。特别是对于编写,流可以在每次元素更新时发出,或者它可以是一个批处理更新,当整批元素更新时只发出一次。
也就是说,您需要了解这两个流是否相互独立。更具体地说,如果您可以返回读取结果并同时更新 Firestore,或者如果您需要确保在将更新结果返回给客户端之前更新 Firestore。
然后您需要了解如何管理错误。更新错误的处理方式是否与读取错误不同?在其余的答案中,我不会涵盖这部分,但它不能被忽略。
假设您处于必须先更新 Firestore 中的所有元素才能将结果返回给客户端的情况。在这种情况下,我会按照这些思路使用和处理。
首先在服务中创建一个方法,该方法返回一个 Observable,该方法仅在每个元素更新后才会发出从 Firestore 读取的出价列表。
acceptArtistBid(bid)
// define a variable to hold the list of bids
let listOfBids: Array<Bid>;
// notice that the Observable is returned by this method and not subscribed in the service
return this.findArtistBidsAppliedByCurrentWall(bid).pipe(
// here you use the map of Observable to transform the list, and within this map you call the map Array method to run the transformation on each bid
map(artistsBisdList =>
listOfBids = artistsBisdList.map(bid => // transform bid);
return listOfBids
),
// assume updateFirestore$ is a method that returns an Observable representing the update operation - this second map operator returns an array of Observables
map(updatedList => updatedList.map(updatedBid => updateFirestore$(bid))),
// forkJoin is an RxJs function that accepts an array of Observables and emits when all Observables have completed - in this case it emits when all elements have been updated on Firestore
concatMap(listOfObservables => forkJoin(listOfObservables)),
// this last map is just to make the method return listOfBids
map(() => listOfBids)
)
然后我会让服务的客户端决定何时何地订阅 acceptArtistBid
方法返回的 Observable。
上述代码示例未经测试,因此可能包含一些错误,但我希望它能够传达逻辑意义。还请考虑可以删除 listOfBids
变量,但这可能会使代码的可读性降低,至少根据我的口味。
一般来说,认为服务方法返回 Observables 而不是订阅它们是一个很好的规则。订阅它们应该是客户的责任。这是构建应用程序的反应式方法,首先定义您感兴趣的流,通常连接简单的流以创建更复杂的流†,然后订阅它们。
使用这种方法,您最终会得到很少(通常只有一个)代表应用程序整个逻辑的流,因此即使不是只有一个,订阅也很少。
【讨论】:
以上是关于什么是使用 RxJS 在包含数组的 Observable 上迭代(并应用一些逻辑)的优雅解决方案?的主要内容,如果未能解决你的问题,请参考以下文章
RxJS Observables 的 Promise.all 行为?
Rxjs:使用 scan 或 mergeMap 或任何 rxjs 在 X 秒后将 observables 数据流(grpc 服务)组合成一个数组