RxJS:如何将 Observable<Observable<Thing>> 转换为 Observable<Thing[]>
Posted
技术标签:
【中文标题】RxJS:如何将 Observable<Observable<Thing>> 转换为 Observable<Thing[]>【英文标题】:RxJS : how to transform an Observable<Observable<Thing>> into Observable<Thing[]> 【发布时间】:2021-10-27 04:33:29 【问题描述】:我不是 rxJS 管道方面的专家,我有一个我认为很简单的问题,我的代码归结为:
getThings = (): Observable<Thing[]> =>
this.getThingIds().pipe(
mergeMap((thingIds: number[]) =>
thingIds.map((id: number) => this.http.get<Thing>(`url$id`))
)
);
问题是这会返回一个Observable<Observable<Thing>>
。有没有办法使用运算符将其转换为所需的 Observable<Thing[]>
?还是我从一开始就完全错了。
基本上我需要对从getThingIds
收到的每个 ID 执行一个请求,然后将所有这些结果转换为一个数组
【问题讨论】:
【参考方案1】:想要提供一种性能更高的解决方案。虽然forkJoin()
确实做到了这一点,但您的订阅者必须等到所有Thing
s 都被获取后才能发出数据。
以下解决方案完成了同样的事情,但为您提供了一个随着每个 HTTP 请求完成而增长的数组。
getThings$ = this.getThingIds$.pipe(
switchMap((ids:number[])=>
from(ids).pipe(
mergeMap(id=>this.http.get<Thing>(`url$id`)),
scan((things, thing)=>[...things, thing], [] as Thing[])
)
)
);
这里是一行一行
我们从switchMap()
开始,而不是mergeMap()
。合并映射创建一个队列,而切换映射将中断任何正在进行的 http 请求。如果我们得到一个新的 id 数组,我们不想等待所有旧的 id 先解决。
接下来我们使用from()
,它将创建一个可观察的对象,该可观察对象会发出id
数组的每个值。
现在我们使用mergeMap()
设置我们的http 队列。
接下来是scan()
运算符,它类似于Array.reduce()
,但会为从前一行(HTTP 队列)发出的每个值触发内部函数。我们使用扩展运算符将每个新的Thing
添加到我们累积的响应中。
注意:我将方法更改为变量,因为它们都不需要任何参数。它只是删除了最微小的样板。
【讨论】:
【参考方案2】:我不认为你离得很远。您可以将它们与combineLatest
或forkJoin
结合使用——例如:
getThings = (): Observable<Thing[]> =>
this.getThingIds().pipe(
mergeMap((thingIds: number[]) =>
const things$ = [];
thingIds.forEach((id: number) => things$.push(this.http.get<Thing>(`url$id`)));
return forkJoin(things$);
)
);
【讨论】:
以上是关于RxJS:如何将 Observable<Observable<Thing>> 转换为 Observable<Thing[]>的主要内容,如果未能解决你的问题,请参考以下文章
RxJS 将 observable 附加到 typescript 中的 observable 数组列表
[RxJS] Split an RxJS Observable into groups with groupBy