嵌套可观察订阅问题,无法取消订阅

Posted

技术标签:

【中文标题】嵌套可观察订阅问题,无法取消订阅【英文标题】:Nested observable subscription issue, unable to unsubscribe 【发布时间】:2019-11-15 13:54:27 【问题描述】:

我在我的应用中使用 Firebase 身份验证和 Firestore (https://github.com/angular/angularfire2)。使用我当前的 Firestore 规则,我确保在退出之前取消订阅从 Firestore 检索到的每个 observable。但是,我收到“FirebaseError:缺少权限或权限不足”。错误,我追溯到我在订阅中订阅的嵌套订阅,如下面的代码所示。

在 ngOnDestory 中,我取消了两个订阅,但我仍然收到之前的错误。

this.proposalSubscription = this._posts.retrieveProposals(user.uid)
  .subscribe(proposals => 
      this.proposals = proposals;
      this.proposals.forEach(proposal => 
        this.chatSubscription = this._chat.retrieveChat(proposal.id).subscribe(chat => 
          if (chat) 
            this.chats.set(proposal.id, chat)
          
        );
      )
    );

我现在几乎可以确定问题出在这一行:this.chatSubscription = this._chat.retrieveChat(proposal.id).subscribe(chat => ... 即使我在退出之前同时取消了proposalSubscription 和chatSubscription 的订阅,我仍然收到错误消息。关于如何解决这个问题的任何想法?另外,我对 rxjs 和运营商没有太多经验。有没有我可以用来避免这种嵌套订阅的运算符?

提前致谢。

【问题讨论】:

嗨。为什么不使用 rxjs 中的 concatMap 将第二个可观察对象嵌套到第一个可观察对象,而不是在第一个订阅中订阅?它会更干净,您只需退订1次 【参考方案1】:

嵌套订阅并不是处理依赖 observable 的最佳方式,我们通常使用 switchMap 来代替。在同一组件中管理多个订阅的最佳方法是使用 takeUntil,因为您可以将值发送到单个主题并一次性取消所有订阅。我们可以将 propasals 映射到一个 observables 数组,并使用 combineLatest 返回一个包含这些 observables 结果的数组。

finalise = new Subject();

this._posts.retrieveProposals(user.uid)
  .pipe(
    switchMap(proposals => combineLatest(proposals.map(proposal => this._chat.retrieveChat(proposal.id)))),
    takeUntil(finalise)
  ).subscribe(chats => 
    const chat = proposals.find(p => p);
    this.chats.set(proposal.id, chat)
  );

在 ngOnDestroy 中

this.finalise.next();

将在 takeUntil 中结束所有观看 finalize 的订阅。

【讨论】:

【参考方案2】:

您可以这样设置订阅:

this.proposalSubscription = this._posts.retrieveProposals(user.uid)
                                    .pipe(
                                      switchMap(proposals => 

                                        //if you need to save it in class variable you can save it like this
                                        this.proposals = proposals;

                                        const obs$ = proposals.map(p => 
                                          return this._chat.retrieveChat(p.id)
                                                     .pipe(
                                                       map(chat => 
                                                         this.chats.set(p,id, chat);
                                                       )
                                                     )
                                        );

                                        return forkJoin(obs$);
                                      )
                                    )
                                    .subscribe();

您可以根据需要组成您的运营商链,并且只有一个订阅。

ngOnDestory 中取消订阅,如下所示:

ngOnDestroy() 

 if(this.proposalSubscription) 
   this.proposalSubscription.unsubscribe()
 

您可以避免在ngOnDestroy 中显式地使用unsubscribe,并通过使用take(1) 来维护Subscription 实例如果您希望只评估一次this._posts.retrieveProposals(user.uid) 可观察,如下所示:

this._posts.retrieveProposals(user.uid)
                                    .pipe(
                                      take(1),
                                      switchMap(proposals => 

                                        //if you need to save it in class variable you can save it like this
                                        this.proposals = proposals;

                                        const obs$ = proposals.map(p => 
                                          return this._chat.retrieveChat(p.id)
                                                     .pipe(
                                                       map(chat => 
                                                         this.chats.set(p,id, chat);
                                                       )
                                                     )
                                        );

                                        return forkJoin(obs$);
                                      ),                                          
                                    )
                                    .subscribe();

【讨论】:

我在没有 take(1) 的情况下尝试了您的解决方案,因为我确实需要更新,但是我仍然收到权限错误的次数与 this._chat.retrieveChat(p.id) 返回值一样多!订阅是否可能仍在泄漏?

以上是关于嵌套可观察订阅问题,无法取消订阅的主要内容,如果未能解决你的问题,请参考以下文章

如何取消订阅角度组件中的多个可观察对象?

如何取消订阅路由解析类中的可观察对象

我们是不是需要取消订阅完成/错误输出的可观察对象?

使用 takeUntil 和 combineLatest 取消订阅可观察的 rxjs 不起作用

可观察,在 ngOnDestroy 中取消订阅不起作用

submitForm 上的 Angular 2 RxJS 可观察取消订阅