RXJS sharedReplay 在重新创建 observable 时不起作用

Posted

技术标签:

【中文标题】RXJS sharedReplay 在重新创建 observable 时不起作用【英文标题】:RXJS sharedReplay does not work when recreating an observable 【发布时间】:2021-02-11 08:13:52 【问题描述】:

我正在为我的应用程序使用 RXJS websocket。登录后,数据从我的服务器发送到客户端,并使用shareReplay 将数据存储在可观察对象中,以便在导航应用程序时保留数据。

if(!this.data$ || this.data$Completed) 
    this.data$ = this.dataService.get(this.CTRL, 'list').pipe(
        finalize(() => 
            console.log('finalizing data list');
            this.data$Completed = true;
        ),
        map(data => [....]), // irrelevant
        tap((d) => console.log('data: ', d)),
        shareReplay(1),
        tap((d) => console.log('data 2: ', d)),
    );

控制台输出符合预期

// first subscribe
data:  […]
data 2:  […]
// every other subscribe when resubscribing after navigation / different components
data 2:  […]
data 2:  […]
data 2:  […]
...

当注销连接终止并触发finalize 方法。重新登录时再次发送数据,但shareReplay 不再起作用。我只在第一次订阅时获得数据,但在导航和重新订阅后没有。

// logout
finalizing data list

// logging in again
data:  […]
data 2:  […]

我还注意到,在这种情况下,rxjs websocket multiplex 向服务器发送订阅命令,所以显然我订阅了 websocket 主题而不是 ReplaySubject。

【问题讨论】:

你如何订阅(或|异步)你的 rs ?它不在代码中,请记住,管道本身不会做任何事情,它是“激活”整个事情的订阅。 我在 html 中使用 async 订阅 observable,但我也尝试在组件中使用 subscribe 调用,结果是一样的。 【参考方案1】:

finalize被调用时,这意味着源Observable已经完成。您无法重新启动已完成的 Observabledata$ 仍然在订阅时发出,因为这是 shareReplay 的想法,它只是为您提供订阅时最后发出的值,无论源是否完成。但是它永远不会获得新数据。

当您再次登录时,您必须重新创建您的 data$ observable,以获得一个全新的 observable。

【讨论】:

当重新登录 websocket 时会重新连接到服务器。据我所知,创建了一个新的WebSocketSubject @Fussel 这部分代码是在您与服务器建立新连接时运行this.data$ = this.dataService.get(this.CTRL, 'list') 吗?可能有相当多的地方可能出错,如果您还添加一小部分代码可能会更好,您如何在组件中准确使用此this.data$,以及dataService.get 的作用 我的错误是我在创建 observable 时没有将 this.data$Completed 设置为 false,仅作为默认值。这就是为什么它只在第一次起作用的原因……有时是小事。 =/【参考方案2】:

总的来说,我对 rs 和主题有点生疏,因为我使用 ngrx 在我最近工作的每个应用程序中存储数据,但是使用 websockets 的逻辑与我们的经典 api 有点不同,因为 websocket 本身就是一个主题,并且每次重新连接时都会重新创建自己。当这种情况发生时,所有基础订阅都会消失。遗憾的是,这意味着我们不能使用 API 调用的逻辑,其中组件订阅第一个 observable 并且管道链“触发”api 服务。

相反,我们所做的是直接在具有 ws 主题的数据服务中使用订阅。这是安全的,因为每次 websocket 断开连接时,它都会发出一个完成的事件,因此订阅会终止。这个订阅负责通过触发动作将数据推送到ngrx存储中,这基本上是对next的调用。

该服务使用 websocket 配置选项设计,以在每次失去连接时重新连接,并在登录时重新连接。在 reco 时,它重新创建了一个 websocket 主题和将数据推送到 ngrx 的底层订阅。

【讨论】:

这个问题没有提到ngrx,为什么要提到一个关于不使用它的应用程序的问题? 哇,ngrx 是癌症吗?真的吗?无论如何,因为我们所做的是使用 ngrx 而不是数据存储层的重放或行为主题。所以我把它作为一种免责声明来谈论。但是从架构的角度来看,ngrx 只是一个 store,扩展了整个 private Subject,public asObservable 逻辑,所以除了这个区别,我们使用的逻辑可以用在它的 case 上。 @AdrianBrand 你对 ngrx 的评论让我很困惑。我广泛使用它,如果有一些已知的陷阱,我宁愿现在接受教育,而不是以后自己发现。因此,如果您有任何评论或博客文章或类似内容要分享,请不要犹豫。 所以我可以发出一个 http 请求并将响应存储在服务中的 BehaviorSubject 中,就像 Angular 设计的那样。或者我可以跨多个文件夹制作大量的样板代码,这样我就可以调度一个被reducer捕获的动作,该reducer更新全局存储中的状态,然后被执行http请求的效果捕获,该请求将切换映射到另一个再次更新全局状态的reducer捕获的操作。 这是我遇到过的最反直觉的模式。它会破坏你的速度,窃取你的雇主预算和判决,然后让加入团队的开发人员有一个巨大的加速期。存储模式解决了 React 生态系统中 Angular 中不存在的问题,不要进入 Angular 生态系统并拖着那个可怕的包袱。学习 Angular 依赖注入系统,你永远不会发现自己需要商店。

以上是关于RXJS sharedReplay 在重新创建 observable 时不起作用的主要内容,如果未能解决你的问题,请参考以下文章

在 Angular 和 rxjs 中重新连接 websocket?

角度 4 上的 rxjs-websockets,处理重新连接

rxjs websocket:心跳和重新连接

重新连接 websocket rxjs

Angular4 Websocket rxjs重新连接和onError

rxjs5.X系列 —— transform api 笔记