在 Angular 和 rxjs 中重新连接 websocket?
Posted
技术标签:
【中文标题】在 Angular 和 rxjs 中重新连接 websocket?【英文标题】:Reconnecting a websocket in Angular and rxjs? 【发布时间】:2017-10-19 00:13:34 【问题描述】:我有一个基于 ngrx/store (v2.2.2) 和 rxjs (v5.1.0) 的应用程序,它使用可观察对象侦听 Web 套接字以获取传入数据。当我启动应用程序时,我完美地接收到传入的数据。
但是过了一段时间(更新很少出现),连接似乎丢失了,我不再收到传入数据。我的代码:
服务
import Injectable, OnInit from '@angular/core';
import Observable from 'rxjs';
@Injectable()
export class MemberService implements OnInit
private websocket: any;
private destination: string = "wss://notessensei.mybluemix.net/ws/time";
constructor()
ngOnInit()
listenToTheSocket(): Observable<any>
this.websocket = new WebSocket(this.destination);
this.websocket.onopen = () =>
console.log("WebService Connected to " + this.destination);
return Observable.create(observer =>
this.websocket.onmessage = (evt) =>
observer.next(evt);
;
)
.map(res => res.data)
.share();
订阅者
export class AppComponent implements OnInit
constructor(/*private store: Store<fromRoot.State>,*/ private memberService: MemberService)
ngOnInit()
this.memberService.listenToTheSocket().subscribe((result: any) =>
try
console.log(result);
// const member: any = JSON.parse(result);
// this.store.dispatch(new MemberActions.AddMember(member));
catch (ex)
console.log(JSON.stringify(ex));
)
当 Web 套接字超时时我需要做什么来重新连接,以便可观察对象继续发出传入值?
我看了一些 Q&A here、here 和 here,它似乎没有解决这个问题(以我能理解的方式)。
注意:wss://notessensei.mybluemix.net/ws/time
的 websocket 是实时的,并且每分钟发出一次时间戳(以防万一想要测试它)。
非常感谢您的建议!
【问题讨论】:
如果流中有错误,那么它“应该”在.subscribe
的“错误”参数中被拾取。您的所有try ... catch
正在做的就是拾取流实际接收到一些数据的块中产生的任何错误。因此,至少添加错误处理程序是一个很好的起点,因为它会显示是否确实存在终止错误。
请注意,由于您将 Websocket 手动包装在一个 observable 中,因此:“应该”还有一个 this.websocket.onerror
映射到 observer.error
以便在订阅中获取它。
感谢提示。我将重新编写我的示例,看看我是否可以超越它。断开连接不是错误,而是套接字的事件,因此它可能不会冒泡到可观察对象
【参考方案1】:
这可能不是一个好的答案,但评论太多了。
问题可能来自您的服务:
listenToTheSocket(): Observable<any>
this.websocket = new WebSocket(this.destination);
this.websocket.onopen = () =>
console.log("WebService Connected to " + this.destination);
return Observable.create(observer =>
this.websocket.onmessage = (evt) =>
observer.next(evt);
;
)
.map(res => res.data)
.share();
您是否认为您在组件中多次进入ngOnInit
方法?
您应该尝试将console.log
放入ngOnInit
以确定。
因为如果你这样做了,在你的服务中你会用一个新的覆盖this.websocket
。
您应该尝试类似的方法:
@Injectable()
export class MemberService implements OnInit
private websocket: any;
private websocketSubject$ = new BehaviorSubject<any>();
private websocket$ = this.websocketSubject$.asObservable();
private destination = 'wss://notessensei.mybluemix.net/ws/time';
constructor()
ngOnInit()
listenToTheSocket(): Observable<any>
if (this.websocket)
return this.websocket$;
this.websocket = new WebSocket(this.destination);
this.websocket.onopen = () => console.log(`WebService Connected to $this.destination`);
this.websocket.onmessage = (res) => this.websocketSubject$.next(res => res.data);
如果BehaviorSubject
在您订阅之前收到事件,它将发送最后一个值。另外,因为它是一个主题,所以不需要使用share
运算符。
【讨论】:
服务是单例的,所以 ngOnInit 应该只运行一次。但我会检查一下我们的 我知道服务是单例的。抱歉,如果我说得不够清楚,但我说的是ngOnInit
在您的组件中,它正在调用您的服务方法listenToTheSocket
啊。知道了。在这里非常感谢您的建议。 App 组件位于组件树的根部。 “应该”只运行一次,但值得指出和检查。【参考方案2】:
其实现在rxjs里面有一个WebsocketSubject!
import webSocket from 'rxjs/webSocket' // for RxJS 6, for v5 use Observable.webSocket
let subject = webSocket('ws://localhost:8081');
subject.subscribe(
(msg) => console.log('message received: ' + msg),
(err) => console.log(err),
() => console.log('complete')
);
subject.next(JSON.stringify( op: 'hello' ));
当您重新订阅断开的连接时,它会处理重新连接。所以例如写这个来重新连接:
subject.retry().subscribe(...)
有关详细信息,请参阅文档。不幸的是,搜索框没有显示该方法,但您可以在这里找到它:
http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-webSocket
#-navigation 在我的浏览器中不起作用,因此请在该页面上搜索“webSocket”。
来源:http://reactivex.io/rxjs/file/es6/observable/dom/WebSocketSubject.js.html#lineNumber15
【讨论】:
如何导入这个? 在 rxjs 6 中导入这个使用import webSocket from 'rxjs/webSocket'
这个方法对我不起作用。我必须使用new WebSocketSubject('ws://localhost:8081')
构造函数,然后在接下来我必须发送subject.next(event: 'myEvent', data: 'hello')
文档明确地说,在多个地方“确保使用 JSON.stringify 序列化对象”但我无法做到这一点工作。这个答案由培根保存:***.com/a/50400143/4233452
“WebSocketSubject”类型上不存在属性“重试”?
重试应该是这样的:subject.pipe(retry()).subscribe(...【参考方案3】:
对于 rxjs 6,websocket 实现。
import webSocket from 'rxjs/websocket';
let subject = webSocket('ws://localhost:8081');
【讨论】:
【参考方案4】:对于 rxjs 6 实现
import webSocket from 'rxjs/webSocket'
let subject = webSocket('ws://localhost:8081');
subject.pipe(
retry() //support auto reconnect
).subscribe(...)
【讨论】:
以上是关于在 Angular 和 rxjs 中重新连接 websocket?的主要内容,如果未能解决你的问题,请参考以下文章
角度 4 上的 rxjs-websockets,处理重新连接
如何在 Angular 和 RxJs 中管理多个顺序订阅方法?