Angular Websocket RxJS / WebSocketSubject
Posted
技术标签:
【中文标题】Angular Websocket RxJS / WebSocketSubject【英文标题】: 【发布时间】:2017-05-26 12:46:14 【问题描述】:过去我们使用了几个 angular2 websocket impelmentations,但我们对它们并不满意,使用它们存在一些问题。所以我们决定用 RxJs 自己尝试一下。
这是我们的第一次尝试:
@Injectable()
export class WebSocketService
public createWebsocket(url: string): Subject<MessageEvent>
let socket = new WebSocket(url);
let observable = Observable.create(
(observer: Observer<MessageEvent>) =>
socket.onmessage = observer.next.bind(observer);
socket.onerror = observer.error.bind(observer);
socket.onclose = observer.complete.bind(observer);
return socket.close.bind(socket);
);
let observer =
next: (data: Object) =>
if (socket.readyState === WebSocket.OPEN)
socket.send(JSON.stringify(data));
;
return Subject.create(observer, observable);
套接字已打开并正常工作了一段时间。几秒钟后,浏览器正在关闭套接字,我在服务器端收到一个关闭事件。
这是我们进入服务器站点的最终原因: [1006] WebSocket 读取 EOF
有人可以帮忙吗?或者有谁知道如何使用WebSocketSubject?
【问题讨论】:
gearheart.io/blog/auto-websocket-reconnection-with-rxjs @JuliaPassynkova 感谢您的链接,我已经找到了这个解决方案,而且效果很好。这个解决方案的问题是,总是存在关闭 Websocket 的问题,这个解决方案在关闭连接时会重新连接,这没关系。但这就像第一次连接,在服务器上第一次连接会做很多昂贵的事情。所以最好不要从浏览器端关闭连接。那么问题来了,为什么浏览器要关闭 websocket? 由于您似乎已经完成了自己的 websocket 实现,请您分享您自己的解决方案吗? @BlackHoleGalaxy 你是指谁?我? 嗨,迈克尔,是的,因为您在下面的答案中添加了评论,告诉您通过自己的实现设法解决了这个问题。我们遇到了您所面临的相同问题。 【参考方案1】:我不知道它是否仍然相关,但我确实使用了与 https://github.com/ohjames/rxjs-websockets 的角度类似的 websocket 连接
我在调用 ServerSocketService 的组件中进行了一些修改(用于连接到基于 websocket 的端点),即重试机制,并且我使用了 ReplaySubject 而不是示例中给出的 QueuingSubject。
@Injectable()
export class ServerSocket
// private inputStream: QueueingSubject<string>;
private inputStream: ReplaySubject<string>;
public messages: Observable<string>;
private subscription: Subscription;
private websocket: WebSocket;
public connect()
if (this.messages)
return;
console.log('inside connect');
// this.inputStream = new QueueingSubject<string>();
this.inputStream = new ReplaySubject();
// Using share() causes a single websocket to be created when the first
// observer subscribes. This socket is shared with subsequent observers
// and closed when the observer count falls to zero.
this.messages = websocketConnect(
'ws://localhost:9097/echo',
this.inputStream
).messages.share();
this.messages.retryWhen(errors => errors.delay(1000)).subscribe(message =>
console.log('error', message);
);
public send(message: string): void
// If the websocket is not connected then the QueueingSubject will ensure
// that messages are queued and delivered when the websocket reconnects.
// A regular Subject can be used to discard messages sent when the websocket
// is disconnected.
this.inputStream.next(message);
然后在组件的 OnInit 生命周期内建立连接,订阅然后发送消息。一旦组件达到其 OnDestroy 生命周期,只需取消订阅即可释放资源。
【讨论】:
这看起来不错,同时我们已经调整了我们自己的 websocket 实现,但如果有时间我们也会看看这个以上是关于Angular Websocket RxJS / WebSocketSubject的主要内容,如果未能解决你的问题,请参考以下文章
无法从 websocket rxjs 节点 Angular 获得响应
如何在 Angular 12 中制作一个简单的 rxjs/webSocket 服务?
Angular4 Websocket rxjs重新连接和onError
使用 RxJs WebSocketSubject 和 Angular Universal 时出现“ReferenceError: WebSocket is not defined”