Angular Websocket RxJS / WebSocketSubject

Posted

技术标签:

【中文标题】Angular Websocket RxJS / WebSocketSubject【英文标题】: 【发布时间】:2017-05-26 12:46:14 【问题描述】:

过去我们使用了几个 angular2 websocket impelmentations,但我们对它们并不满意,使用它们存在一些问题。所以我们决定用 RxJs 自己尝试一下。

这是我们的第一次尝试:

@Injectable()
export class WebSocketService

    public createWebsocket(url: string): Subject<MessageEvent> 
        let socket = new WebSocket(url);

        let observable = Observable.create(
                                    (observer: Observer<MessageEvent>) => 
                                        socket.onmessage = observer.next.bind(observer);
                                        socket.onerror   = observer.error.bind(observer);
                                        socket.onclose   = observer.complete.bind(observer);

                                        return socket.close.bind(socket);
                                    );

        let observer = 
            next: (data: Object) => 
                if (socket.readyState === WebSocket.OPEN) 
                    socket.send(JSON.stringify(data));
                
            
        ;

        return Subject.create(observer, observable);
    

套接字已打开并正常工作了一段时间。几秒钟后,浏览器正在关闭套接字,我在服务器端收到一个关闭事件。

这是我们进入服务器站点的最终原因: [1006] WebSocket 读取 EOF

有人可以帮忙吗?或者有谁知道如何使用WebSocketSubject?

【问题讨论】:

gearheart.io/blog/auto-websocket-reconnection-with-rxjs @JuliaPassynkova 感谢您的链接,我已经找到了这个解决方案,而且效果很好。这个解决方案的问题是,总是存在关闭 Websocket 的问题,这个解决方案在关闭连接时会重新连接,这没关系。但这就像第一次连接,在服务器上第一次连接会做很多昂贵的事情。所以最好不要从浏览器端关闭连接。那么问题来了,为什么浏览器要关闭 websocket? 由于您似乎已经完成了自己的 websocket 实现,请您分享您自己的解决方案吗? @BlackHoleGalaxy 你是指谁?我? 嗨,迈克尔,是的,因为您在下面的答案中添加了评论,告诉您通过自己的实现设法解决了这个问题。我们遇到了您所面临的相同问题。 【参考方案1】:

我不知道它是否仍然相关,但我确实使用了与 https://github.com/ohjames/rxjs-websockets 的角度类似的 websocket 连接

我在调用 ServerSocketService 的组件中进行了一些修改(用于连接到基于 websocket 的端点),即重试机制,并且我使用了 ReplaySubject 而不是示例中给出的 QueuingSubject。

@Injectable()
export class ServerSocket 
    // private inputStream: QueueingSubject<string>;
    private inputStream: ReplaySubject<string>;
    public messages: Observable<string>;
    private subscription: Subscription;
    private websocket: WebSocket;

    public connect() 
        if (this.messages) 
            return;
        
        console.log('inside connect');
        // this.inputStream = new QueueingSubject<string>();
        this.inputStream = new ReplaySubject();

        // Using share() causes a single websocket to be created when the first
        // observer subscribes. This socket is shared with subsequent observers
        // and closed when the observer count falls to zero.
        this.messages = websocketConnect(
            'ws://localhost:9097/echo',
            this.inputStream
        ).messages.share();


        this.messages.retryWhen(errors => errors.delay(1000)).subscribe(message => 
           console.log('error', message);
        );
    

    public send(message: string): void 
        // If the websocket is not connected then the QueueingSubject will ensure
        // that messages are queued and delivered when the websocket reconnects.
        // A regular Subject can be used to discard messages sent when the websocket
        // is disconnected.

        this.inputStream.next(message);

    

然后在组件的 OnInit 生命周期内建立连接,订阅然后发送消息。一旦组件达到其 OnDestroy 生命周期,只需取消订阅即可释放资源。

【讨论】:

这看起来不错,同时我们已经调整了我们自己的 websocket 实现,但如果有时间我们也会看看这个

以上是关于Angular Websocket RxJS / WebSocketSubject的主要内容,如果未能解决你的问题,请参考以下文章

无法从 websocket rxjs 节点 Angular 获得响应

如何在 Angular 12 中制作一个简单的 rxjs/webSocket 服务?

Angular4 Websocket rxjs重新连接和onError

使用 RxJs WebSocketSubject 和 Angular Universal 时出现“ReferenceError: WebSocket is not defined”

Rxjs Websocket:如何添加标头

角度 rxjs 从 websocket 更改异步可观察