RxJS:如何将 Observable<Observable<Thing>> 转换为 Observable<Thing[]>

Posted

技术标签:

【中文标题】RxJS:如何将 Observable<Observable<Thing>> 转换为 Observable<Thing[]>【英文标题】:RxJS : how to transform an Observable<Observable<Thing>> into Observable<Thing[]> 【发布时间】:2021-10-27 04:33:29 【问题描述】:

我不是 rxJS 管道方面的专家,我有一个我认为很简单的问题,我的代码归结为:

getThings = (): Observable<Thing[]> => 
    this.getThingIds().pipe(
        mergeMap((thingIds: number[]) => 
            thingIds.map((id: number) => this.http.get<Thing>(`url$id`))
        )
    );

问题是这会返回一个Observable&lt;Observable&lt;Thing&gt;&gt;。有没有办法使用运算符将​​其转换为所需的 Observable&lt;Thing[]&gt; ?还是我从一开始就完全错了。

基本上我需要对从getThingIds 收到的每个 ID 执行一个请求,然后将所有这些结果转换为一个数组

【问题讨论】:

【参考方案1】:

想要提供一种性能更高的解决方案。虽然forkJoin() 确实做到了这一点,但您的订阅者必须等到所有Things 都被获取后才能发出数据。

以下解决方案完成了同样的事情,但为您提供了一个随着每个 HTTP 请求完成而增长的数组。

getThings$ = this.getThingIds$.pipe(
  switchMap((ids:number[])=>
    from(ids).pipe(
      mergeMap(id=>this.http.get<Thing>(`url$id`)),
      scan((things, thing)=>[...things, thing], [] as Thing[])
    )
  )
);

这里是一行一行

我们从switchMap() 开始,而不是mergeMap()。合并映射创建一个队列,而切换映射将中断任何正在进行的 http 请求。如果我们得到一个新的 id 数组,我们不想等待所有旧的 id 先解决。 接下来我们使用from(),它将创建一个可观察的对象,该可观察对象会发出id 数组的每个值。 现在我们使用mergeMap() 设置我们的http 队列。 接下来是scan() 运算符,它类似于Array.reduce(),但会为从前一行(HTTP 队列)发出的每个值触发内部函数。我们使用扩展运算符将每个新的Thing 添加到我们累积的响应中。

注意:我将方法更改为变量,因为它们都不需要任何参数。它只是删除了最微小的样板。

【讨论】:

【参考方案2】:

我不认为你离得很远。您可以将它们与combineLatestforkJoin 结合使用——例如:

getThings = (): Observable<Thing[]> => 
    this.getThingIds().pipe(
        mergeMap((thingIds: number[]) => 
            const things$ = [];
            thingIds.forEach((id: number) => things$.push(this.http.get<Thing>(`url$id`)));
            return forkJoin(things$);
        )
    );

【讨论】:

以上是关于RxJS:如何将 Observable<Observable<Thing>> 转换为 Observable<Thing[]>的主要内容,如果未能解决你的问题,请参考以下文章

RxJS 将 observable 附加到 typescript 中的 observable 数组列表

[RxJS] Split an RxJS Observable into groups with groupBy

如何在 Angular 4 中推送到数组的 Observable? RxJS

RxJS:我将如何“手动”更新 Observable?

将 Promise 转换为 RxJs Observable

如何基于另一个 Observable 重置 RXJS 扫描算子