rxjs websocket:心跳和重新连接

Posted

技术标签:

【中文标题】rxjs websocket:心跳和重新连接【英文标题】:rxjs websocket: heartbeat and reconnection 【发布时间】:2021-02-18 11:41:17 【问题描述】:

我正在尝试实现一个 RXJS websocket 客户端,它会在出错时自动重新连接,以防它没有从服务器获得心跳。

问题在于,在出错时,重新连接尝试不遵循我尝试设置的计划(每 10 秒一次,最多 10 次)。其实一开始的速率是每2秒一次,然后开始同时发出多个请求,然后重连后仍然触发连接请求。

我做错了什么?

init(wsUrl: string, wsAuthToken: string, reconnect?: boolean) 
    this.connected$ = new Subject<boolean>();
    if (this._heartbeatSub) 
      this._heartbeatSub.unsubscribe();
    

    if (!this.socket$ || this.socket$.closed)
      this.socket$ = webSocket(
        protocol: wsAuthToken,
        url: wsUrl,
        deserializer: (val) => 
          if (val.data === "H") return "H";
          else return JSON.parse(val.data);
        ,
        serializer: (val) => 
          if (val === "H") 
            return "H";
           else return JSON.stringify(val);
        ,
        openObserver: 
          next: () => 
            this.loggedOut = false;
            this.connected$.next(true);
            this._heartbeatSub = timer(0, 15000)
              .pipe(
                concatMap(() =>
                  race(
                    of("timeout").pipe(delay(15000)),
                    this.socket$.pipe(filter((msg) => msg === "H"))
                  )
                )
                // filter((resp) => resp === "timeout")
              )
              .subscribe((resp) => 
                if (resp === "timeout") 
                  this.socket$.complete();
                 else this.socket$.next("H");
              );
          ,
        ,
        closeObserver: 
          next: () => 
            this._heartbeatSub.unsubscribe();
            this.connected$.next(false);
            if (this.socket$) this.socket$.complete();
            if (!this.loggedOut) this.init(wsUrl, wsAuthToken, true);
          ,
        ,
      );
    this.socket$
      .pipe(
        reconnect ? this.reconnect : (o) => o,
        filter((msg) => msg !== "H")
      )
      .subscribe((msg) => 
        if (msg.header.hasOwnProperty("rid")) 
          const subj = this.subjects.get(
            msg.header.ssid + "-" + msg.header.uid + "-" + msg.header.rid
          );
          subj.next(msg);
          subj.complete();
          this.subjects.delete(
            msg.header.ssid + "-" + msg.header.uid + "-" + msg.header.rid
          );
         else
          this.subjects.get(msg.header.ssid + "-" + msg.header.uid).next(msg);
      );
    return this.connected$.asObservable().pipe(take(1));
  

  close() 
    this.loggedOut = true;
    this.connected$.next(false);
    this._heartbeatSub.unsubscribe();
    this.socket$.complete();
    this.socket$ = undefined;
  

  private reconnect(observable: Observable<any>): Observable<any> 
    let tries = 0;
    return observable.pipe(
      retryWhen((errors) =>
        errors.pipe(
          tap((val) => console.log("websocket: Try to reconnect", val)),
          delay(10000),
          take(10)
        )
      )
    );
  

编辑

我这样修改了我的openObserver

        openObserver: 
      next: () => 
        this.loggedOut = false;
        this.reconnecting = false;
        this.connected$.next("connected");
        this._heartbeatSub = timer(0, 15000)
          .pipe(
            concatMap(() =>
              race(
                of("timeout").pipe(delay(15000)),
                this.connected$.pipe(
                  tap((val) => console.log(val)),
                  filter((status) => status === "error")
                ),
                this.socket$.pipe(
                  filter((msg) => msg === "H"),
                  take(1)
                )
              )
            )
            // filter((resp) => resp === "timeout")
          )
          .subscribe((resp) => 
            if (resp === "timeout" || resp === "error") 
              this._heartbeatSub.unsubscribe();
              this.init(wsUrl, wsAuthToken, true);
             else this.socket$.next("H");
          );
      ,
    ,

我的closeObserver 这边

closeObserver: 
      next: () => 
        if (!this.loggedOut) this.connected$.next("error");

      ,
    ,

但是,我仍然有一个问题:重试每 13 秒触发一次,而不是,即使 reconnect 的延迟为 10 秒。另外,不考虑重试限制

【问题讨论】:

我相信 heartbeat sub 中存在错误。您可以尝试在此处添加 take(1) 吗? this.socket$.pipe(filter((msg) =&gt; msg === "H"), take(1)) 也没用 【参考方案1】:

试试这个重新连接处理程序

 private reconnect(observable: Observable<any>): Observable<any> 
    let tries = 0;
    return observable.pipe(
      retryWhen((attempts) =>
        attempts.pipe(
          switchMap((err, i) => i < 10 ? timer(10000) : throwError(err), (a,b,i) => i),
 // switchMap selector is just needed for the next tap with console.log. feel free to remove it
          tap((i) => console.log("websocket: Try to reconnect. attempt: ", val + 1)),
        )
      )
    );
  

或者构建你自己的,下面retryWhen operator page底部的例子

【讨论】:

它一直在做同样的事情。我不知道 2 秒的东西从哪里出来【参考方案2】:

此代码与此类似:https://github.com/lamisChebbi/ng-realtime-dashboard/blob/master/src/app/services/data.service.ts

我试过了,也发现了重新连接的问题。我的猜测是: 当连接失败时调用closeObserver。这里this.init 被调用,它创建了一个新的WebSocket 连接。而且我认为这是一个新的尝试,还没有错误。所以retryWhen 只是通过它。当错误发生时,重试逻辑开始,但closeObserver 也被再次调用。所以一切重新开始。

当我尝试连接到本地主机上未运行的服务器时,尝试连接和连接错误之间的时间为 2 到 5 秒。

另请参阅我在 GitHub 上的问题:https://github.com/lamisChebbi/ng-realtime-dashboard/issues/20

我通过添加一个变量inRetryMode 让它工作,该变量以false 开头并在openObserver 中重置为false。并且:

private reconnect(observable: Observable<any>): Observable<any> 
    let tries = 0;
    return observable.pipe(
        retryWhen((errors) => errors.pipe(
            tap((val) => 
                this.inRetryMode = true;
                console.log("websocket: Try to reconnect", val))
            ,
            delay(10000),
            take(10)
        ))
    );

然后在closeObserver 中调用init(),如果不是inRetryMode。 也许只在closeObserver 中添加重试逻辑会更好?

【讨论】:

以上是关于rxjs websocket:心跳和重新连接的主要内容,如果未能解决你的问题,请参考以下文章

在 Angular 和 rxjs 中重新连接 websocket?

角度 4 上的 rxjs-websockets,处理重新连接

Angular4 Websocket rxjs重新连接和onError

重新连接 websocket rxjs

websocket实现心跳连接

Go语言实现建立websocket连接并定时发送心跳