Angular4 Websocket rxjs重新连接和onError

Posted

技术标签:

【中文标题】Angular4 Websocket rxjs重新连接和onError【英文标题】:Angular4 Websocket rxjs Reconnect and onError 【发布时间】:2018-03-14 15:52:08 【问题描述】:

看起来有几个类似的问题,但几天后我找不到正确的答案。我的问题是如何知道服务器是否关闭了 websocket 以及如何尝试重新连接。我已经看过几个示例,但是当我想在更改视图时实现从客户端关闭 websocket 的功能时,它们都不能正常工作。

然后我发现了这个example,它是迄今为止我见过的最好的,并且通过一些小的修改,我能够添加一个运行良好的关闭函数。

从客户端关闭 websocket 不再是问题,但是,我无法知道 websocket 何时被服务器关闭以及如何再次重新连接 .

我的代码与this 问题非常相似,但我要求的是不同的东西。此外,我在重新使用 websocket 时遇到了问题,直到我看到他们在我放置的链接中谈论的共享功能,所以在我发布的课程中,websocket 服务和使用 websocket 服务的服务合并为一个

我的 websocket 服务

import  Injectable  from '@angular/core';
import  Observable, Observer, Subscription  from 'rxjs';
import  Subject  from 'rxjs/Subject';

import  ConfigService  from "../config.service";

@Injectable()
export class WSEtatMachineService 
    public messages: Subject<any>  = new Subject<any>();
    private url: string = '';
    static readonly ID = 'machine';

    private _subject: Subject<MessageEvent>;
    private _subjectData: Subject<number>;
    private _ws: any;

    constructor(private configService: ConfigService) 
        console.log('constructyor ws machine service')
        this.setUrl(WSEtatMachineService.ID)
    

    public setUrl(id:string) 
        const host = this.configService.getConfigReseau().ipServer;
        const port = this.configService.getConfigReseau().portServer;
        this.url = `ws://$host:$port/` + id 
    

    public connect() 
        console.log('connect ws machine service ', this.url)
        this.messages = <Subject<any>>this._connect(this.url)
            .map((response: any): any => 
                console.log('ws etat machine service: ', response)
                return JSON.parse(response.data);
            )

    

    public close() 
        console.log('on closing WS');
        this._ws.close()
        this._subject = null
    

    public send(msg: any) 
        this.messages.next(JSON.stringify(msg));
    

    // Private methods to create the websocket

    public _connect(url: string): Subject<MessageEvent> 
        if (!this._subject) 
            this._subject = this._create(url);
        
        return this._subject;
    

    private _create(url: string): Subject<MessageEvent> 
        this._ws = new WebSocket(url);

        let observable = Observable.create(
            (obs: Observer<MessageEvent>) => 
                this._ws.onmessage = obs.next.bind(obs);
                this._ws.onerror   = obs.error.bind(obs);
                this._ws.onclose   = obs.complete.bind(obs);
                return this._ws.close.bind(this._ws);
            ).share();

        let observer = 
            next: (data: Object) => 
                if (this._ws.readyState === WebSocket.OPEN) 
                    this._ws.send(JSON.stringify(data));
                
            
        ;

        return Subject.create(observer, observable);
    
 // end class 

然后在我做的组件中:

constructor( private wsMachineService: WSMachineService)  ... 

ngOnInit()  
...
this.wsMachineService.connect();
    // connexion du web socket
    this.wsMachineService.messages.subscribe(
      machine => 
        console.log(" wsMachineService open and alive", machine);

      ,
      err => 
        // This code is never executed
        console.log(" wsMachineService closed by server!!", err);
      
    );



ngOnDestroy() 
    //let tmp = this.synoptiqueSocketSubscription.unsubscribe();
    this.wsMachineService.messages.unsubscribe();
    this.wsMachineService.close()

我想我在 _create 函数中遗漏了一些东西,因为我尝试在 connect 函数的主题中进行捕捉,但它不起作用。

关于如何知道它是否已关闭并尝试重新连接的任何想法?

谢谢


编辑: 我认为我的问题与主题/可观察对象有关,因为我不能完全控制它们。我有一种旧方法,我可以知道服务器何时死机并且它每 X 秒尝试重新连接,但不幸的是,我无法关闭来自客户端的连接,因为我无权访问 websocket 对象。我也添加了代码:

  public messages: Observable<any>;
  private ws: Subject<any>;
  private url: string;
  public onclose = new Subject();

  public connect(urlApiWebSocket: string): Observable<any> 
    if (this.messages && this.url === urlApiWebSocket) 
      return this.messages;
    
    this.url = urlApiWebSocket;
    this.ws = Observable.webSocket(
      url: urlApiWebSocket,
      closeObserver: this.onclose
    );
    return this.messages = this.ws.retryWhen(errors => errors.delay(10000)).map(msg => msg).share();
  

  send(msg: any) 
    this.ws.next(JSON.stringify(msg));
  

让我们看看我们是否有办法将这两种解决方案结合起来。

【问题讨论】:

【参考方案1】:

嗯,我找到了办法。我正在使用 Observable.websocket 的旧方法

@Injectable()
export class WSMyService 
    private url: string = '';
    static readonly ID = 'mytestwebsocket';
    readonly reload_time = 3000;

    public messages: Observable<any>;
    private ws: Subject<any>;
    public onclose = new Subject();
    constructor(private configService: ConfigService) 
        console.log('constructor ws synop service')
        this.setUrl(WSActionFrontService.ID)
    

    public setUrl(id:string) 
        ...
        this.url = `ws://$host:$port/` + id 
    

    public connect(): Observable<any> 
      this.ws = Observable.webSocket(
        url: this.url,
        closeObserver: this.onclose
      );
      return this.messages = this.ws.retryWhen(errors => errors.delay(this.reload_time)).map(msg => msg).share();
    

    send(msg: any) 
      this.ws.next(JSON.stringify(msg));
    

    public close() 
        console.log('on closing WS');
        this.ws.complete();
    

当我使用它时:

constructor(
    private wsMyService: WSMyService,

ngOnInit():
  ...
  this.mySocketSubscription = this.wsMyService.connect().subscribe(message => 
 ... 
  this.wsMyService.onclose.subscribe(evt => 
  // the server has closed the connection
  )

ngOnDestroy() 

    this.wsMyService.close();
    this.mySocketSubscription.unsubscribe();
    ...

看起来我所要做的就是调用函数 complete() 告诉服务器客户端已经完成。 我确信有更好的方法,但这是我发现的唯一适合我的方法。

感谢您的帮助。

【讨论】:

【参考方案2】:

您无法知道服务器何时关闭与客户端的连接。

如您所见,this._ws.onclose = obs.complete.bind(obs); 仅在客户端执行“关闭”操作时才会触发。

常见的玩法:

在服务器关闭时,您会向所有客户端发送特殊消息以通知它。 创建 ping 机制以询问服务器他是否还活着。

【讨论】:

您好,感谢您的回答。在我有一个像this.ws = Observable.webSocket( url: myUrl, closeObserver: this.onclose ); return this.messages = this.ws.retryWhen(errors =&gt; errors.delay(10000)).map(msg =&gt; msg).share(); 这样的代码之前,我能够知道服务器何时死机并且 ws 试图重新连接直到它可用。但是,通过这种方法,我无法访问 ws 本身(它使用可观察的 websocket),因此我无法从客户端关闭。我认为我的问题在于可观察对象和主题,但我没有看到。 在您的示例中,您不能这样做。ws.unsubscribe();强制 Observable 关闭来自客户端的连接? 如果你说的是我使用的第一种方法,当我取消订阅时,有时它会关闭连接,但有时它没有。使用新的,关闭它总是有效的

以上是关于Angular4 Websocket rxjs重新连接和onError的主要内容,如果未能解决你的问题,请参考以下文章

Angular 4 中的 WebSocket

在 Angular 和 rxjs 中重新连接 websocket?

rxjs websocket:心跳和重新连接

角度 4 上的 rxjs-websockets,处理重新连接

rxjs/webSocket - 到“ws://localhost:3000/”的 WebSocket 连接失败:连接在收到握手响应之前关闭

如何在 Jasmine 中模拟 rxjs webSocket?