如何组合多个 Observable 类型数组 Observable<T[]>?

Posted

技术标签:

【中文标题】如何组合多个 Observable 类型数组 Observable<T[]>?【英文标题】:How to combine multiple Observable type arrays Observable<T[]>? 【发布时间】:2019-10-09 20:48:32 【问题描述】:

我正在 Firebase Firestore 上运行多个查询。他们都(可以)返回一个 Observable。我想将所有 Observable 组合成一个 Observable。

我尝试过不同的 RxJS 运算符,例如; combineLatest、forkJoin、concat、zip、merge 和 mergeMap。但我一直未能达到预期的效果。

getPrivateBooksFromAuthors(authors): Observable<Book[]> 
    let bookRefs: Observable<Book[]>[] = [];
    authors.map(key => 
      bookRefs.push(this.afs.collection<Book>('books', ref => 
        ref.where('public', '==', false)
           .where('authors', 'array-contains', key)
        ).valueChanges())
    );
    return bookRefs[0]

在上面的代码中,我从作者 [0] 那里获得了所有私人书籍。当我返回 concat(...bookRefs) 或 concat(bookRefs[0], bookRefs[1]) 时,我仍然只能从作者 [0] 那里获得书籍。我希望得到所有作者提供的所有书籍。

【问题讨论】:

【参考方案1】:

正如@Doflamingo 所说,您可以使用 forkJoin 来并行调用它们并接收包含已解决的所有响应数组的响应。 您遇到的问题是每个响应都是一个 Book[],因此在 forkJoin 中您会收到一个 Book[] 数组(Book[][])。您需要将 Book[][]flat 转换为 Book[]。您可以使用 a map 和 a reduce 函数来做到这一点。

在这里,我发布了一个简单的代码片段,控制台记录了 forkJoin 的响应以及应用 reduce 函数后的结果。

function mockBooks$(key): Observable<Book[]> 
  return rxjs.of([key + '-book1', key + '-book2', key + '-book3']);


function getPrivateBooksFromAuthors(authors): Observable<Book[]> 
    let bookRefs: Observable<Book[]>[] = authors.map(key => mockBooks$(key));

    return rxjs.forkJoin(bookRefs).pipe(
        rxjs.operators.tap((books) => console.log('After forkJoin', books)),
        // You need this flattening operation!!
        rxjs.operators.map(books => books.reduce((acc, cur) => [...acc, ...cur], []) ));



const authors = ['a1', 'a2', 'a3', 'a4', 'a5'];
getPrivateBooksFromAuthors(authors).subscribe((data) => console.log('Final result', data));
&lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.2/rxjs.umd.js"&gt;&lt;/script&gt;

希望这会有所帮助!

【讨论】:

洛伦斯,谢谢你的回答!!代码示例帮助很大!当我运行你的代码时,它工作得很好。一旦我用我对 Firestore 收藏的真实调用替换了 mockBooks,它就失败了。我又花了几天时间才弄明白。但我认为如果某些作者在数据库中没有书籍,它会失败。用 combineLatest 交换 forkJoin 为我修复了它。但我真的需要你的地图和减少代码!谢谢! 我很高兴它有帮助! forkJoin 可能对您不起作用,因为您的可观察 .valueChanges() 可能永远不会完成。为了完成,您可以返回:.valueChanges.pipe(take(1))。或者使用 CombineLatest,例如: combineLatest(bookRefs).pipe(take(1))。注意:如果你不放take(1),你应该稍后退订。如果您想要继续订阅,您可以删除 take(1) 并监听每次更改。【参考方案2】:

我认为最好的解决方案是使用fork join。

Fork 让您可以并行执行调用。每次调用的结果都放在一个对象 (join) 中,您可以获得全部调用的结果。

【讨论】:

感谢您的回答!当我在函数末尾返回 forkJoin(bookRefs) 时,出现类型错误。 forkJoin 返回一个 Observable。所以一个可观察的书数组数组。而不是将数组组合成一个数组。还是我执行错了?【参考方案3】:

@Doflamingo 为我们指明了 forkJoin 的方向,@Llorenç 给出了一个很棒的代码示例,其中包含 map 和 reduce 管道,将 Observable 组合成一个 Observable。对我来说,如果某些作者没有 Books,forkJoin 就会失败。所以我最终选择了 combineLatest(它不会等待所有 Observables 完成)。这是我的最终代码(所有功劳归于@Llorenç):

  mockBooks$(key): Observable<Book[]> 
    return this.afs.collection<Book>('books', ref => 
      ref.where('private', '==', true)
         .where('authors', 'array-contains', key)
      ).valueChanges()
    // return of([key + '-book1', key + '-book2', key + '-book3']);
  

  getPrivateBooksFromAuthors(authors): Observable<Book[]> 
    let bookRefs: Observable<Book[]>[] = authors.map(key => this.mockBooks$(key));

     // return combineLatest(bookRefs).pipe(
     //     tap((books) => console.log('After forkJoin', books)),
     //     // You need this flattening operation!!
     //     map(books => books.reduce((acc, cur) => [...acc, ...cur], []) ));

     return combineLatest<Book[]>(bookRefs).pipe(
        map(arr => arr.reduce((acc, cur) => acc.concat(cur) ) ),
     )
  

我已经注释掉了 Llorenç 的代码,以显示不同之处。谢谢两位!

【讨论】:

以上是关于如何组合多个 Observable 类型数组 Observable<T[]>?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 RxSwift 中组合多个 Observable

rxswift:如何获取 Observable<Any> 导致 Observable<[Category]> 的组合?

RxJava 合并组合两个(或多个)Observable数据源

Observable类API

如何将空 JSON 数据推送到 Observable<any[]> angular 2 类型的数组字段

Scala中泛型类型的模式匹配