角度 rxjs 从 websocket 更改异步可观察

Posted

技术标签:

【中文标题】角度 rxjs 从 websocket 更改异步可观察【英文标题】:angular rxjs change async observable from websocket 【发布时间】:2019-11-05 08:43:51 【问题描述】:

您好,我正在使用 Angular 7 和 rxjs 异步

在我的组件中,我将 ngFor 与异步观察者一起使用

<item-comp [item]="item" *ngFor="let item of groupsService.selectedItems$ | async; ">

</item-comp>

在我的服务中,我有一个 BehaviorSubject,当用户选择一个组时会发出它

  public groupSelected$: BehaviorSubject<any> = new BehaviorSubject(null);

这是 selectedItems$ Observable :

public selectedItems$ = this.groupSelected$.pipe(
    switchMap((group: any) => 
        if (!group)
          return new EmptyObservable();

        return this.http.get('/api/'+ group)
          .pipe(
            map((res: any) => 
                return res.items;
              
            )
          )
      
    )
  )

这可行,但现在我需要能够更改特定项目以响应 websocket 消息。 我有一个 websocket 连接来处理更新项目的消息。有没有办法使用 rxjs 的反应式方法来做到这一点?

【问题讨论】:

【参考方案1】:

您可以为 websocket 更新创建更高阶的函数,然后将其与您的 http 请求链接起来

const onUpdate=(items)=>updateFromWebSocket.pipe(
   map(itemUpdates=>........ return updatedItems
   startWith(items)
)

selectedItems$.pipe(switchMap(items=>onUpdate(items)).subscribe()

【讨论】:

您好,感谢您的回复。每次从 websocket 收到消息时使用您的解决方案都会发出新的 http 请求?这似乎效率不高,有没有一种方法可以创建一个新的流,以某种方式将 http 结果与 websocket 更新合并,而无需每次都发出 http 请求? on update是http请求之后的后续流。所以它不会触发http请求。实际上http请求会切换到监听socket消息

以上是关于角度 rxjs 从 websocket 更改异步可观察的主要内容,如果未能解决你的问题,请参考以下文章

从组件到服务的角度9传递方法

Angular4 Websocket rxjs重新连接和onError

无法从 websocket rxjs 节点 Angular 获得响应

在 Angular 和 rxjs 中重新连接 websocket?

从角度 4 中的“rxjs/Observable”导入 Observable 时出错

RxJs:根据字段值更改创建可观察对象