如何在没有嵌套订阅的情况下使用 rxjs 处理内部订阅数组

Posted

技术标签:

【中文标题】如何在没有嵌套订阅的情况下使用 rxjs 处理内部订阅数组【英文标题】:How to handle an array of inner subscriptions with rxjs without nested subscriptions 【发布时间】:2021-11-25 16:34:42 【问题描述】:

我已经为我的问题准备了this example on Stackblitz。在这个 Angular 应用程序中,我有一个 NestedSolutionComponent 与我当前的工作解决方案和一个 AppComponent,我想通过使用正确的 rxjs 操作来实现相同的结果。

对于一个更复杂的真实示例,我正在寻找一种解决方案,将我的多个内部订阅的结果映射到我的外部订阅数组。

用户服务的 REST 调用为我提供了这个数组:

  [
    
      id: 1,
      name: 'User One',
      groupIds: [1, 3]
    ,
    
      id: 2,
      name: 'User Two',
      groupIds: [2, 3, 4]
    ,
  ]

对于每个组,我想调用一个 REST 组服务,它为我提供有关用户组的更多信息。总而言之,我调用了组服务 5 次,组数组中的每个 ID 都被调用一次。完成后,应将结果映射到组数组中 - 但不应仅具有 ID,而是应将整个对象存储到数组中。

解决方案应如下所示:

  [
    
      id: 1
      name: 'User One'
      groups: [
         id: 1, name: 'Group One' ,
         id: 3, name: 'Group Three' 
      ] 
    ,
    
      id: 2
      name: 'User Two'
      groups: [
         id: 2, name: 'Group Two' ,
         id: 3, name: 'Group Three' ,
         id: 4, name: 'Group Four' 
      ] 
    
  ]

通过嵌套订阅,解决方案很简单 - 但很难看。我先调用用户服务,然后为每个用户调用每个组:

    this.appService.getUsers().subscribe((users) => 
      const _usersWithGroupNames = users.map((user) => 
        const userWithGroupNames = 
          id: user.id,
          name: user.name,
          groups: [],
         as UserWithGroupNames;
        user.groupIds.forEach((groupId) => 
          this.appService.getGroupById(groupId).subscribe((groupWithName) => 
            userWithGroupNames.groups.push(
              id: groupWithName.id,
              name: groupWithName.name,
            );
          );
        );
        return userWithGroupNames;
      );
      this.usersWithGroupNames.next(_usersWithGroupNames); // Subject
    );

我已经花费了好几个小时,但我真的没有看到任何使用适当的 rxjs 运算符的解决方案。我尝试了switchMapmergeMap,但以嵌套地图操作告终。 forkJoin 似乎也帮不了我,因为我收到了一个数组,我必须按特定顺序调用内部订阅。当我在管道中调用多个 mergeMaps 时,我无法访问以前的值。我想要这样的解决方案

// not real code, just dummy code
userService.pipe(
  xmap(users => generateUsersWithEmptyGroupArray()),
  ymap(users => users.groups.forEach(group => groupService.getGroup(group)),
  zmap((user, groups) => mapUserWithGroups(user, groups)) // get single user with all group information
).subscribe(usersWithGroups => this.subject.next(usersWithGroups))

这里有人知道我的问题的正确且可读的解决方案吗?

提前非常感谢!

【问题讨论】:

【参考方案1】:

第一种方法:使用switchMap、mergeMap、from、forkJoin

this.appService
  .getUsers()
  .pipe(
    switchMap((users) =>
        // for each user
      from(users).pipe(
       // merge map to run parallel for each user
        mergeMap(( groupIds, ...user ) =>
        // wait to retrive all group details of current user at mergeMap
        // after completing use map to map user with retrived group 
          forkJoin(
            groupIds.map((id) => this.appService.getGroupById(id))
          ).pipe(map((groups) => ( ...user, groups )))
        )
      )
    )
  )
  .subscribe((result) => 
    console.log(result);
  );

Demo

在上面的代码中,forkJoin 将等待获取特定用户的所有 groupIds 详细信息,并且如果他已检索第一个用户的组 id 3,它将再次检索用户 2 的 groupId 3 详细信息等等。在短重复组中,将检索详细信息。

第二种方法:下面是我们将从用户数组中获取所有groupsIds的方法,使它们唯一,并行获取它们的所有细节,最后,我们将映射通过groupIds向用户发送组详细信息,这里我们不会等待每个用户组ID详细信息被检索到,并且不会检索重复的组详细信息。

this.appService
.getUsers()
.pipe(
    switchMap((users) =>
      // get all unique groupIds of all users
      from(this.extractUniqueGroupIds(users)).pipe(
        // parallell fetch all group details
        mergeMap((groupId) => this.appService.getGroupById(groupId)),
        // wait to to complete all requests and generate array out of it
        reduce((acc, val) => [...acc, val], []),
        // to check retrived group details
        // tap((groups) => console.log('groups retrived: ', groups)),
        // map retrived group details back to users
        map((groups) => this.mapGroupToUsers(users, groups))
      )
    )
)
.subscribe((result) => 
    console.log(result);
    // this.usersWithGroupNames.next(result);
);

private mapGroupToUsers(users: User[], groups: Group[]): UserWithGroup[] 
    return users.map(( groupIds, ...user ) => (
        ...user,
        groups: groupIds.map((id) => groups.find((g) => g.id === id)),
    ));


private extractUniqueGroupIds(users: User[]): number[] 
    const set = users.reduce((acc,  groupIds ) => 
        groupIds.forEach((id) => acc.add(id));
            return acc;
    , new Set<number>());

    return [...set];


interface UserWithGroup 
  id: number;
  name: string;
  groups: any[];

Demo

【讨论】:

【参考方案2】:

不确定您尝试了什么,但这就是我将如何构建此流的方式。这是一个switchMap (可能是mergeMapconcatMap,在这种情况下应该没关系),一个forkJoin (不知道为什么没有'不适合你,但它应该从我所看到的),以及一个map 来创建具有组名的最终用户。

如果您有任何问题,我很乐意更新此答案并进行澄清。

interface User 
  id: number,
  name: string,
  groupIds: number[]


interface UserWithGroupNames 
  id: number,
  name: string,
  groups: any[]


class ArbiratryClassName 

  public usersWithGroupNames$: Observable<UserWithGroupNames[]>;

  embellishUser(user: User): Observable<UserWithGroupNames> 

    // forkJoin to get all group names
    return forkJoin(
      user.groupIds.map(groupId => 
        this.appService.getGroupById(groupId)
      )
    ).pipe(
      // map to create final UserWithGroupNames
      map(groups => (
        id: user.id,
        name: user.name,
        groups
      ) as UserWithGroupNames)
    );
    
  

  arbiratryInit(): void 

    // instead of this.usersWithGroupNames Subject, create
    // the stream directly as usersWithGroupNames$, and subscribe to
    // usersWithGroupNames$ whereever you'd use the subject.

    this.usersWithGroupNames$ = this.appService.getUsers().pipe(

      switchMap((users: User[]) => 
        forkJoin(users.map(u => this.embellishUser(u)))
      )

    );
  

【讨论】:

以上是关于如何在没有嵌套订阅的情况下使用 rxjs 处理内部订阅数组的主要内容,如果未能解决你的问题,请参考以下文章

如何在没有嵌入式订阅的情况下执行一个又一个异步操作

如何在 Angular 和 RxJs 中管理多个顺序订阅方法?

假设RxJS将在订阅顺序中触发每个观察者的下一个是否安全?

RxJS:取消订阅嵌套订阅

如何订阅一次可观察对象?

在页面热模块替换期间创建新的 RxJS 订阅