如何在没有嵌套订阅的情况下使用 rxjs 处理内部订阅数组
Posted
技术标签:
【中文标题】如何在没有嵌套订阅的情况下使用 rxjs 处理内部订阅数组【英文标题】:How to handle an array of inner subscriptions with rxjs without nested subscriptions 【发布时间】:2021-11-25 16:34:42 【问题描述】:我已经为我的问题准备了this example on Stackblitz。在这个 Angular 应用程序中,我有一个 NestedSolutionComponent
与我当前的工作解决方案和一个 AppComponent
,我想通过使用正确的 rxjs 操作来实现相同的结果。
对于一个更复杂的真实示例,我正在寻找一种解决方案,将我的多个内部订阅的结果映射到我的外部订阅数组。
用户服务的 REST 调用为我提供了这个数组:
[
id: 1,
name: 'User One',
groupIds: [1, 3]
,
id: 2,
name: 'User Two',
groupIds: [2, 3, 4]
,
]
对于每个组,我想调用一个 REST 组服务,它为我提供有关用户组的更多信息。总而言之,我调用了组服务 5 次,组数组中的每个 ID 都被调用一次。完成后,应将结果映射到组数组中 - 但不应仅具有 ID,而是应将整个对象存储到数组中。
解决方案应如下所示:
[
id: 1
name: 'User One'
groups: [
id: 1, name: 'Group One' ,
id: 3, name: 'Group Three'
]
,
id: 2
name: 'User Two'
groups: [
id: 2, name: 'Group Two' ,
id: 3, name: 'Group Three' ,
id: 4, name: 'Group Four'
]
]
通过嵌套订阅,解决方案很简单 - 但很难看。我先调用用户服务,然后为每个用户调用每个组:
this.appService.getUsers().subscribe((users) =>
const _usersWithGroupNames = users.map((user) =>
const userWithGroupNames =
id: user.id,
name: user.name,
groups: [],
as UserWithGroupNames;
user.groupIds.forEach((groupId) =>
this.appService.getGroupById(groupId).subscribe((groupWithName) =>
userWithGroupNames.groups.push(
id: groupWithName.id,
name: groupWithName.name,
);
);
);
return userWithGroupNames;
);
this.usersWithGroupNames.next(_usersWithGroupNames); // Subject
);
我已经花费了好几个小时,但我真的没有看到任何使用适当的 rxjs 运算符的解决方案。我尝试了switchMap
和mergeMap
,但以嵌套地图操作告终。 forkJoin
似乎也帮不了我,因为我收到了一个数组,我必须按特定顺序调用内部订阅。当我在管道中调用多个 mergeMaps 时,我无法访问以前的值。我想要这样的解决方案
// not real code, just dummy code
userService.pipe(
xmap(users => generateUsersWithEmptyGroupArray()),
ymap(users => users.groups.forEach(group => groupService.getGroup(group)),
zmap((user, groups) => mapUserWithGroups(user, groups)) // get single user with all group information
).subscribe(usersWithGroups => this.subject.next(usersWithGroups))
这里有人知道我的问题的正确且可读的解决方案吗?
提前非常感谢!
【问题讨论】:
【参考方案1】:第一种方法:使用switchMap、mergeMap、from、forkJoin
this.appService
.getUsers()
.pipe(
switchMap((users) =>
// for each user
from(users).pipe(
// merge map to run parallel for each user
mergeMap(( groupIds, ...user ) =>
// wait to retrive all group details of current user at mergeMap
// after completing use map to map user with retrived group
forkJoin(
groupIds.map((id) => this.appService.getGroupById(id))
).pipe(map((groups) => ( ...user, groups )))
)
)
)
)
.subscribe((result) =>
console.log(result);
);
Demo
在上面的代码中,forkJoin
将等待获取特定用户的所有 groupIds
详细信息,并且如果他已检索第一个用户的组 id 3,它将再次检索用户 2 的 groupId
3 详细信息等等。在短重复组中,将检索详细信息。
第二种方法:下面是我们将从用户数组中获取所有groupsIds
的方法,使它们唯一,并行获取它们的所有细节,最后,我们将映射通过groupIds
向用户发送组详细信息,这里我们不会等待每个用户组ID详细信息被检索到,并且不会检索重复的组详细信息。
this.appService
.getUsers()
.pipe(
switchMap((users) =>
// get all unique groupIds of all users
from(this.extractUniqueGroupIds(users)).pipe(
// parallell fetch all group details
mergeMap((groupId) => this.appService.getGroupById(groupId)),
// wait to to complete all requests and generate array out of it
reduce((acc, val) => [...acc, val], []),
// to check retrived group details
// tap((groups) => console.log('groups retrived: ', groups)),
// map retrived group details back to users
map((groups) => this.mapGroupToUsers(users, groups))
)
)
)
.subscribe((result) =>
console.log(result);
// this.usersWithGroupNames.next(result);
);
private mapGroupToUsers(users: User[], groups: Group[]): UserWithGroup[]
return users.map(( groupIds, ...user ) => (
...user,
groups: groupIds.map((id) => groups.find((g) => g.id === id)),
));
private extractUniqueGroupIds(users: User[]): number[]
const set = users.reduce((acc, groupIds ) =>
groupIds.forEach((id) => acc.add(id));
return acc;
, new Set<number>());
return [...set];
interface UserWithGroup
id: number;
name: string;
groups: any[];
Demo
【讨论】:
【参考方案2】:不确定您尝试了什么,但这就是我将如何构建此流的方式。这是一个switchMap
(可能是mergeMap
或concatMap
,在这种情况下应该没关系),一个forkJoin
(不知道为什么没有'不适合你,但它应该从我所看到的),以及一个map
来创建具有组名的最终用户。
如果您有任何问题,我很乐意更新此答案并进行澄清。
interface User
id: number,
name: string,
groupIds: number[]
interface UserWithGroupNames
id: number,
name: string,
groups: any[]
class ArbiratryClassName
public usersWithGroupNames$: Observable<UserWithGroupNames[]>;
embellishUser(user: User): Observable<UserWithGroupNames>
// forkJoin to get all group names
return forkJoin(
user.groupIds.map(groupId =>
this.appService.getGroupById(groupId)
)
).pipe(
// map to create final UserWithGroupNames
map(groups => (
id: user.id,
name: user.name,
groups
) as UserWithGroupNames)
);
arbiratryInit(): void
// instead of this.usersWithGroupNames Subject, create
// the stream directly as usersWithGroupNames$, and subscribe to
// usersWithGroupNames$ whereever you'd use the subject.
this.usersWithGroupNames$ = this.appService.getUsers().pipe(
switchMap((users: User[]) =>
forkJoin(users.map(u => this.embellishUser(u)))
)
);
【讨论】:
以上是关于如何在没有嵌套订阅的情况下使用 rxjs 处理内部订阅数组的主要内容,如果未能解决你的问题,请参考以下文章