角度 4 上的 rxjs-websockets,处理重新连接
Posted
技术标签:
【中文标题】角度 4 上的 rxjs-websockets,处理重新连接【英文标题】:rxjs-websockets on angular 4, handling reconnections 【发布时间】:2017-11-27 08:49:40 【问题描述】:我目前正在尝试使用 angular 4 (https://github.com/ohjames/rxjs-websockets) 来使用 rxjs-websockets。
这是我的服务:
import Injectable from '@angular/core';
import QueueingSubject from 'queueing-subject'
import Observable from 'rxjs/Observable'
import websocketConnect from 'rxjs-websockets'
import environment from 'environments/environment';
@Injectable()
export class WebsocketJWTService
private inputStream: QueueingSubject < any >
public messages: Observable < any >
SERVER: string = environment["SERVER_ADDRESS"];
constructor(private modal: Modal, private SS: SharedService, private userManager: UserManagerService)
public connect()
let SERVER = this.SERVER.substring(8);
if (this.messages)
return;
let temp = websocketConnect(
"wss://" + SERVER + "/ws/media?token=" + this.userManager.getToken(),
this.inputStream = new QueueingSubject < any > ()
)
this.messages = temp.messages.share()
public send(message: any): void
message['WSTOKEN'] = this.userManager.getToken();
this.inputStream.next(message)
我在我的组件上这样称呼它:
constructor(private socket:WebsocketJWTService)
ngOnInit()
this.socket.connect();
this.socketSubscription = this.socket.messages.subscribe(message => this.socketMessage(message));
现在,我正在尝试处理重新连接,所以如果连接的套接字数量降至 0,我会查看连接状态并再次调用连接函数:
temp.connectionStatus.subscribe(numberConnected =>
if (this.connectedNumber > numberConnected)
console.log("disconnected");
setTimeout(() =>
delete this.messages;
this.connect();
this.messages.subscribe();
, 5000)
this.connectedNumber = numberConnected;
)
我正确地看到我的套接字在它掉线时重新连接,但我的订阅在重新连接后停止工作。 我看到新的套接字连接正在发送套接字事件,但我的角度代码似乎没有得到通知。
有人知道我如何重新连接到 websocket 并保持订阅工作吗?
感谢您的宝贵时间
【问题讨论】:
【参考方案1】:link you pasted 有一个“失败时重新连接”部分。你读过那篇文章吗,因为你就是这样做的,而且它是有效的。
你自己的尝试是行不通的,因为rxjs
不是这样工作的。 subscribe()
返回一个新订阅,并且在您的重新连接代码中,您对该订阅没有做任何事情。您的旧订阅就是您的旧订阅,失败后将无法再使用。
我将从rxjs
的基本入门开始,并在尝试使用它做自己的事情之前阅读有关冷热可观察对象的信息,否则您可以复制您正在使用的库的自述文件中的示例.
(旁注:我是Library Author)。
【讨论】:
【参考方案2】:试一试这样的事情。 count 只是一个用于跟踪重试的局部变量。
this.messages = temp.messages.share()
.retryWhen(attempts =>
return attempts
.do((error)=>
return 1)
.mergeMap(error, count)=>
console.log(`Wait $count seconds, then reconnect!`);
return Observable.timer(count * 1000);
【讨论】:
以上是关于角度 4 上的 rxjs-websockets,处理重新连接的主要内容,如果未能解决你的问题,请参考以下文章