rxjs websocket:心跳和重新连接
Posted
技术标签:
【中文标题】rxjs websocket:心跳和重新连接【英文标题】:rxjs websocket: heartbeat and reconnection 【发布时间】:2021-02-18 11:41:17 【问题描述】:我正在尝试实现一个 RXJS websocket 客户端,它会在出错时自动重新连接,以防它没有从服务器获得心跳。
问题在于,在出错时,重新连接尝试不遵循我尝试设置的计划(每 10 秒一次,最多 10 次)。其实一开始的速率是每2秒一次,然后开始同时发出多个请求,然后重连后仍然触发连接请求。
我做错了什么?
init(wsUrl: string, wsAuthToken: string, reconnect?: boolean)
this.connected$ = new Subject<boolean>();
if (this._heartbeatSub)
this._heartbeatSub.unsubscribe();
if (!this.socket$ || this.socket$.closed)
this.socket$ = webSocket(
protocol: wsAuthToken,
url: wsUrl,
deserializer: (val) =>
if (val.data === "H") return "H";
else return JSON.parse(val.data);
,
serializer: (val) =>
if (val === "H")
return "H";
else return JSON.stringify(val);
,
openObserver:
next: () =>
this.loggedOut = false;
this.connected$.next(true);
this._heartbeatSub = timer(0, 15000)
.pipe(
concatMap(() =>
race(
of("timeout").pipe(delay(15000)),
this.socket$.pipe(filter((msg) => msg === "H"))
)
)
// filter((resp) => resp === "timeout")
)
.subscribe((resp) =>
if (resp === "timeout")
this.socket$.complete();
else this.socket$.next("H");
);
,
,
closeObserver:
next: () =>
this._heartbeatSub.unsubscribe();
this.connected$.next(false);
if (this.socket$) this.socket$.complete();
if (!this.loggedOut) this.init(wsUrl, wsAuthToken, true);
,
,
);
this.socket$
.pipe(
reconnect ? this.reconnect : (o) => o,
filter((msg) => msg !== "H")
)
.subscribe((msg) =>
if (msg.header.hasOwnProperty("rid"))
const subj = this.subjects.get(
msg.header.ssid + "-" + msg.header.uid + "-" + msg.header.rid
);
subj.next(msg);
subj.complete();
this.subjects.delete(
msg.header.ssid + "-" + msg.header.uid + "-" + msg.header.rid
);
else
this.subjects.get(msg.header.ssid + "-" + msg.header.uid).next(msg);
);
return this.connected$.asObservable().pipe(take(1));
close()
this.loggedOut = true;
this.connected$.next(false);
this._heartbeatSub.unsubscribe();
this.socket$.complete();
this.socket$ = undefined;
private reconnect(observable: Observable<any>): Observable<any>
let tries = 0;
return observable.pipe(
retryWhen((errors) =>
errors.pipe(
tap((val) => console.log("websocket: Try to reconnect", val)),
delay(10000),
take(10)
)
)
);
编辑
我这样修改了我的openObserver
openObserver:
next: () =>
this.loggedOut = false;
this.reconnecting = false;
this.connected$.next("connected");
this._heartbeatSub = timer(0, 15000)
.pipe(
concatMap(() =>
race(
of("timeout").pipe(delay(15000)),
this.connected$.pipe(
tap((val) => console.log(val)),
filter((status) => status === "error")
),
this.socket$.pipe(
filter((msg) => msg === "H"),
take(1)
)
)
)
// filter((resp) => resp === "timeout")
)
.subscribe((resp) =>
if (resp === "timeout" || resp === "error")
this._heartbeatSub.unsubscribe();
this.init(wsUrl, wsAuthToken, true);
else this.socket$.next("H");
);
,
,
我的closeObserver
这边
closeObserver:
next: () =>
if (!this.loggedOut) this.connected$.next("error");
,
,
但是,我仍然有一个问题:重试每 13 秒触发一次,而不是,即使 reconnect
的延迟为 10 秒。另外,不考虑重试限制
【问题讨论】:
我相信 heartbeat sub 中存在错误。您可以尝试在此处添加 take(1) 吗?this.socket$.pipe(filter((msg) => msg === "H"), take(1))
也没用
【参考方案1】:
试试这个重新连接处理程序
private reconnect(observable: Observable<any>): Observable<any>
let tries = 0;
return observable.pipe(
retryWhen((attempts) =>
attempts.pipe(
switchMap((err, i) => i < 10 ? timer(10000) : throwError(err), (a,b,i) => i),
// switchMap selector is just needed for the next tap with console.log. feel free to remove it
tap((i) => console.log("websocket: Try to reconnect. attempt: ", val + 1)),
)
)
);
或者构建你自己的,下面retryWhen operator page底部的例子
【讨论】:
它一直在做同样的事情。我不知道 2 秒的东西从哪里出来【参考方案2】:此代码与此类似:https://github.com/lamisChebbi/ng-realtime-dashboard/blob/master/src/app/services/data.service.ts
我试过了,也发现了重新连接的问题。我的猜测是:
当连接失败时调用closeObserver
。这里this.init
被调用,它创建了一个新的WebSocket 连接。而且我认为这是一个新的尝试,还没有错误。所以retryWhen
只是通过它。当错误发生时,重试逻辑开始,但closeObserver
也被再次调用。所以一切重新开始。
当我尝试连接到本地主机上未运行的服务器时,尝试连接和连接错误之间的时间为 2 到 5 秒。
另请参阅我在 GitHub 上的问题:https://github.com/lamisChebbi/ng-realtime-dashboard/issues/20
我通过添加一个变量inRetryMode
让它工作,该变量以false 开头并在openObserver
中重置为false。并且:
private reconnect(observable: Observable<any>): Observable<any>
let tries = 0;
return observable.pipe(
retryWhen((errors) => errors.pipe(
tap((val) =>
this.inRetryMode = true;
console.log("websocket: Try to reconnect", val))
,
delay(10000),
take(10)
))
);
然后在closeObserver
中调用init()
,如果不是inRetryMode
。
也许只在closeObserver
中添加重试逻辑会更好?
【讨论】:
以上是关于rxjs websocket:心跳和重新连接的主要内容,如果未能解决你的问题,请参考以下文章
在 Angular 和 rxjs 中重新连接 websocket?
角度 4 上的 rxjs-websockets,处理重新连接