在 Angular 和 rxjs 中重新连接 websocket?

Posted

技术标签:

【中文标题】在 Angular 和 rxjs 中重新连接 websocket?【英文标题】:Reconnecting a websocket in Angular and rxjs? 【发布时间】:2017-10-19 00:13:34 【问题描述】:

我有一个基于 ngrx/store (v2.2.2) 和 rxjs (v5.1.0) 的应用程序,它使用可观察对象侦听 Web 套接字以获取传入数据。当我启动应用程序时,我完美地接收到传入的数据。

但是过了一段时间(更新很少出现),连接似乎丢失了,我不再收到传入数据。我的代码:

服务

import  Injectable, OnInit  from '@angular/core';
import  Observable  from 'rxjs';

@Injectable()
export class MemberService implements OnInit 

  private websocket: any;
  private destination: string = "wss://notessensei.mybluemix.net/ws/time";

  constructor()  

  ngOnInit()  

  listenToTheSocket(): Observable<any> 

    this.websocket = new WebSocket(this.destination);

    this.websocket.onopen = () => 
      console.log("WebService Connected to " + this.destination);
    

    return Observable.create(observer => 
      this.websocket.onmessage = (evt) => 
        observer.next(evt);
      ;
    )
      .map(res => res.data)
      .share();
  

订阅者

  export class AppComponent implements OnInit 

  constructor(/*private store: Store<fromRoot.State>,*/ private memberService: MemberService) 

  ngOnInit() 
    this.memberService.listenToTheSocket().subscribe((result: any) => 
      try 
        console.log(result);
        // const member: any = JSON.parse(result);
        // this.store.dispatch(new MemberActions.AddMember(member));
       catch (ex) 
        console.log(JSON.stringify(ex));
      
    )
  

当 Web 套接字超时时我需要做什么来重新连接,以便可观察对象继续发出传入值?

我看了一些 Q&A here、here 和 here,它似乎没有解决这个问题(以我能理解的方式)。

注意:wss://notessensei.mybluemix.net/ws/time 的 websocket 是实时的,并且每分钟发出一次时间戳(以防万一想要测试它)。

非常感谢您的建议!

【问题讨论】:

如果流中有错误,那么它“应该”在.subscribe 的“错误”参数中被拾取。您的所有try ... catch 正在做的就是拾取流实际接收到一些数据的块中产生的任何错误。因此,至少添加错误处理程序是一个很好的起点,因为它会显示是否确实存在终止错误。 请注意,由于您将 Websocket 手动包装在一个 observable 中,因此:“应该”还有一个 this.websocket.onerror 映射到 observer.error 以便在订阅中获取它。 感谢提示。我将重新编写我的示例,看看我是否可以超越它。断开连接不是错误,而是套接字的事件,因此它可能不会冒泡到可观察对象 【参考方案1】:

这可能不是一个好的答案,但评论太多了。

问题可能来自您的服务:

listenToTheSocket(): Observable<any> 
  this.websocket = new WebSocket(this.destination);

  this.websocket.onopen = () => 
    console.log("WebService Connected to " + this.destination);
  

  return Observable.create(observer => 
    this.websocket.onmessage = (evt) => 
      observer.next(evt);
    ;
  )
  .map(res => res.data)
  .share();

您是否认为您在组件中多次进入ngOnInit 方法? 您应该尝试将console.log 放入ngOnInit 以确定。

因为如果你这样做了,在你的服务中你会用一个新的覆盖this.websocket

您应该尝试类似的方法:

@Injectable()
export class MemberService implements OnInit 

  private websocket: any;
  private websocketSubject$ = new BehaviorSubject<any>();
  private websocket$ = this.websocketSubject$.asObservable();

  private destination = 'wss://notessensei.mybluemix.net/ws/time';

  constructor()  

  ngOnInit()  

  listenToTheSocket(): Observable<any> 
    if (this.websocket) 
      return this.websocket$;
    

    this.websocket = new WebSocket(this.destination);

    this.websocket.onopen = () => console.log(`WebService Connected to $this.destination`);

    this.websocket.onmessage = (res) => this.websocketSubject$.next(res => res.data);
  

如果BehaviorSubject 在您订阅之前收到事件,它将发送最后一个值。另外,因为它是一个主题,所以不需要使用share 运算符。

【讨论】:

服务是单例的,所以 ngOnInit 应该只运行一次。但我会检查一下我们的 我知道服务是单例的。抱歉,如果我说得不够清楚,但我说的是ngOnInit 在您的组件中,它正在调用您的服务方法listenToTheSocket 啊。知道了。在这里非常感谢您的建议。 App 组件位于组件树的根部。 “应该”只运行一次,但值得指出和检查。【参考方案2】:

其实现在rxjs里面有一个WebsocketSubject!

 import  webSocket  from 'rxjs/webSocket' // for RxJS 6, for v5 use Observable.webSocket

 let subject = webSocket('ws://localhost:8081');
 subject.subscribe(
    (msg) => console.log('message received: ' + msg),
    (err) => console.log(err),
    () => console.log('complete')
  );
 subject.next(JSON.stringify( op: 'hello' ));

当您重新订阅断开的连接时,它会处理重新连接。所以例如写这个来重新连接:

subject.retry().subscribe(...)

有关详细信息,请参阅文档。不幸的是,搜索框没有显示该方法,但您可以在这里找到它:

http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-webSocket

#-navigation 在我的浏览器中不起作用,因此请在该页面上搜索“webSocket”。

来源:http://reactivex.io/rxjs/file/es6/observable/dom/WebSocketSubject.js.html#lineNumber15

【讨论】:

如何导入这个? 在 rxjs 6 中导入这个使用import webSocket from 'rxjs/webSocket' 这个方法对我不起作用。我必须使用new WebSocketSubject('ws://localhost:8081') 构造函数,然后在接下来我必须发送subject.next(event: 'myEvent', data: 'hello') 文档明确地说,在多个地方“确保使用 JSON.stringify 序列化对象”但我无法做到这一点工作。这个答案由培根保存:***.com/a/50400143/4233452 “WebSocketSubject”类型上不存在属性“重试”? 重试应该是这样的:subject.pipe(retry()).subscribe(...【参考方案3】:

对于 rxjs 6,websocket 实现。

import  webSocket  from 'rxjs/websocket';
let subject = webSocket('ws://localhost:8081');

【讨论】:

【参考方案4】:

对于 rxjs 6 实现

import  webSocket  from 'rxjs/webSocket'

let subject = webSocket('ws://localhost:8081');
subject.pipe(
   retry() //support auto reconnect
).subscribe(...)

【讨论】:

以上是关于在 Angular 和 rxjs 中重新连接 websocket?的主要内容,如果未能解决你的问题,请参考以下文章

角度 4 上的 rxjs-websockets,处理重新连接

rxjs websocket:心跳和重新连接

重新连接 websocket rxjs

如何在 Angular 和 RxJs 中管理多个顺序订阅方法?

无法在 RxJs 6 和 Angular 6 中使用 Observable.of

RxJS - 连接并合并两个可观察对象