如何让观察者在认证后等待 WebSocket 的创建
Posted
技术标签:
【中文标题】如何让观察者在认证后等待 WebSocket 的创建【英文标题】:How to make Observers wait for creation of WebSocket after authentication 【发布时间】:2017-09-13 15:39:43 【问题描述】:我正在使用 RxJs 来监听来自 websocket 的数据流。为了使用这个 pubsub 套接字,我需要首先在另一个套接字上进行身份验证,该套接字将返回 pubsub 套接字 url。
我现在只有一个订阅者才能完成这项工作。我需要添加另一个,但我当前的方法存在问题,因为两个订阅者都会触发身份验证并创建第二个 pubsub 套接字。
需要明确的是,整个应用程序应该只有一个身份验证套接字和一个 pubsub 套接字。但是我需要订阅者在尝试使用 pubsub 套接字之前“等待”进行身份验证。否则,pubsub 套接字将是未定义的(因为我们只知道它是运行时的 url)。
这是我目前的尝试:
Websocket.service.ts
private connect(): Observable<IConnectionInfo>
if (!this.authObservable)
var credentials =
"username": "bob",
"password": "slob"
this.authObservable = Observable.create((observer) =>
const socket = new WebSocket(this.authurl);
socket.onopen = () =>
console.log("Auth socket opened");
socket.send(JSON.stringify(credentials));
socket.onmessage = (event) =>
var connection_info = <IConnectionInfo>JSON.parse(event.data);
observer.next(connection_info);
return () =>
socket.close(); //invoked on unsubscribe
console.log("Auth socket closed");
)
return this.authObservable;
public open_pubsub(events: string[]): Observable<IPubSubMessage>
return this.connect()
.flatMap((connection_info: IConnectionInfo) =>
var url = "ws://" + this.hostName + ":" + connection_info.port + "/" + connection_info.ps;
var subscription =
"subscribe": events
var authenticate_request =
"authenticate": connection_info['token']
if (!this.psObservable)
this.psObservable = Observable.create((observer) =>
const socket = new WebSocket(url);
socket.onopen = () =>
console.log("PS socket opened");
socket.send(JSON.stringify(authenticate_request));
socket.send(JSON.stringify(subscription));
socket.onmessage = (event) =>
var psmsg = <IPubSubMessage>JSON.parse(event.data);
observer.next(psmsg);
return () =>
socket.close(); //invoked on unsubscribe
console.log("PS socked closed");
)
return this.psObservable;
);
还有观察者:
getSystemState(): Observable<string>
return this._wsService.open_pubsub([MSG_SYSTEM_STATE_CHANGED])
.map((response: IPubSubMessage): string =>
console.log(response.payload);
return "I wish this worked";
)
.catch(this.handleError);
任何帮助表示赞赏!
(编辑基于一个看似已被删除但确实非常有用的答案,我修改了代码。这解决了部分问题,但仍然导致多个套接字)
【问题讨论】:
【参考方案1】:找到了我所缺少的东西——share() 运算符的魔力。
所以对于两个 Observable,我都这样做:
this.authObservable = Observable.create((observer) =>
...(etc)...
).share();
令人惊讶的是,每个套接字只创建了 1 个。
【讨论】:
以上是关于如何让观察者在认证后等待 WebSocket 的创建的主要内容,如果未能解决你的问题,请参考以下文章
Tornado websocket 客户端:如何异步 on_message? (从未等待协程)