RxJS:取消订阅嵌套订阅

Posted

技术标签:

【中文标题】RxJS:取消订阅嵌套订阅【英文标题】:RxJS: unsubscribe from nested subscribe 【发布时间】:2020-12-29 18:42:25 【问题描述】:

我使用 SockJS 和 StompJS,当我在浏览器中打开我的应用程序时,有时它会在连接到 websocket 之前尝试订阅一些主题。我希望主题订阅等到应用连接到 websocket。

export class SocksService 
...

public subscribe<T>(destination: string, callback?: (body: T) => void, headers?: object): Observable<Subscription> 
const subscription = new Subject<Subscription>();
headers = headers || ;

this.status.pipe(
  first(status => status === ConnectionStatus.CONNECTED)
).subscribe((status: ConnectionStatus) => 
  subscription.next(this.stompClient.subscribe(destination, (message: Message) => 
    if (callback) 
      callback(JSON.parse(message.body) as T);
    
  , headers));
);

return subscription.asObservable();
  

...

这就是我实现这段代码的原因,我这样称呼它:

this.socksService.subscribe<User>('/topic/user', (user: User) => 
  console.log('user received', user);
).subscribe(subscription => this.userSubscription = subscription);

所以我只在连接状态为connected时才订阅主题,并且只会在客户端第一次连接成功时调用。

我想稍后退订该主题,所以我需要内部订阅返回的Subscription 对象,我还需要来自内部订阅的消息。

我实现的效果很好,但我认为必须有更好的方法来做到这一点。

(我尝试了 rx-stomp,但它有很多错误。)

【问题讨论】:

【参考方案1】:

您说您尝试过 rx-stomp,但您是否尝试过 Angular2+ 版本的 rx-stomp,称为 ng2-stompjs?它将关键类公开为 Angular 可注入服务。

在 rx-stomp 下查看它的提及,这里: https://github.com/stomp-js/ng2-stompjs#documentation

...它的实现(Angular 7)通过一个很好的一步一步,在这里: https://stomp-js.github.io/guide/ng2-stompjs/ng2-stomp-with-angular7.html#receiving-messages

例如:

this.rxStompService.watch('/topic/demo').subscribe((message: Message) => 
      this.receivedMessages.push(message.body);
    );

【讨论】:

是的,这就是我尝试过的库。无法配置它的某些部分,并且当它断开连接时,尽管一切正常,但它无法立即重新连接。所以我改用 SockJS 和旧版 StompJS,一切正常。

以上是关于RxJS:取消订阅嵌套订阅的主要内容,如果未能解决你的问题,请参考以下文章

Angular RxJS Observable:takeUntil 与使用订阅取消订阅 [关闭]

取消订阅 Rxjs Observables

任何需要先取消订阅 RxJS()

如何在没有嵌套订阅的情况下使用 rxjs 处理内部订阅数组

RxJS 迁移 5 到 6 - 使用 TakeUntil 取消订阅

取消 rxjs 中的 http 订阅