RxJS 5,将 observable 转换为 BehaviorSubject(?)

Posted

技术标签:

【中文标题】RxJS 5,将 observable 转换为 BehaviorSubject(?)【英文标题】:RxJS 5, converting an observable to a BehaviorSubject(?) 【发布时间】:2018-02-22 18:12:07 【问题描述】:

我有一个父 observable,一旦它有一个订阅者,它将进行查找并发出一个值,然后完成。

我想将其转换为执行以下操作的可观察对象(或行为主题或任何工作):一旦它至少有一个订阅者,它就会从父可观察对象获得结果(一次)。然后它将该值发送给它的所有订阅者,并在订阅时向所有未来的订阅者发送该值。即使它的订阅者数量下降到零,它也应该继续这种行为。

看起来这应该很容易。以下是无效的:

theValue$: Observable<boolean> = parent$
.take(1)
.share()

其他不起作用的东西:publishReplay()publish()。效果更好的东西:

theValue$ = new BehaviorSubject<boolean>(false);

parent$
.take(1)
.subscribe( value => theValue$.next(value));

不过,这种方法存在一个问题:parent$theValue$ 获得第一个订阅者之前就被订阅了。

有没有更好的方法来处理这个问题?

【问题讨论】:

【参考方案1】:

shareReplay 应该做你想做的事:

import 'rxjs/add/operator/shareReplay';
...
theValue$: Observable<boolean> = parent$.shareReplay(1);

shareReplay 在 RxJS 版本 5.4.0 中添加。它返回一个引用计数的 observable,它将在第一次订阅时订阅源 - parent$。源完成后进行的订阅将收到重播通知。

shareReplay - 和一般的refCount - 在我最近写的一篇文章中有更详细的解释:RxJS: How to Use refCount。

【讨论】:

这似乎有效!我会玩一会儿,以确保没有意外。这看起来很棒——我只希望他们在创建文档时将其添加到文档中。我不喜欢感觉自己正在使用可能会消失的未记录功能。但我认为它已添加到更改日志中,所以我猜它会保留下来。 它会一直存在。这些文档正在进行中。无论如何,它非常接近publishReplay(1).refCount() - 差异很微妙。 在阅读了您非常详尽的博文后,我认为我在这里真正想要的是publishLast().refCount,尽管在实践中,它们在我的情况下可能在功能上几乎相同。再次感谢! 不幸的是,这不允许使用 BehaviorSubject.value 属性,因为此操作的结果仍然只是一个 Observable。 @Celestis 使用主题的value 属性通常被认为是代码异味。我当然认为这是不好的做法。【参考方案2】:

我已经实现了一种将 Observables 转换为 BehaviorSubjects 的方法,因为我认为 shareReplay 方法可读性不强以供将来参考。

import  Observable  from 'rxjs/Observable';
import  BehaviorSubject  from 'rxjs/BehaviorSubject';

export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> 
    const subject = new BehaviorSubject(initValue);

    observable.subscribe(
        (x: T) => 
            subject.next(x);
        ,
        (err: any) => 
            subject.error(err);
        ,
        () => 
            subject.complete();
        ,
    );

    return subject;

【讨论】:

是否还没有内置的方法来做到这一点? @chrismarx 不。据我所知,开发者无意添加此功能 这太简单了! BehaviorSubject 的唯一要求确实是一个初始值,例如,如果 observable 从服务中获取,它可以是一个空数组!谢谢! 谨慎使用此解决方案,因为派生主题永远不会取消订阅原始 observable,它可能会导致内存泄漏【参考方案3】:

这是the tmuechsch's answer 的改进变体。

import  Observable, BehaviorSubject  from 'rxjs';

export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> 
  const subject = new BehaviorSubject(initValue);
  const subscription = observable.subscribe(subject);
  return 
    subject,
    stopWatching: () => subscription.unsubscribe()
  ;

要小心,因为返回的主题永远不会取消订阅源 observable。当您知道没有更多对 subject 的引用时(例如,当视图组件被销毁/卸载时),您需要手动调用 stopWatching。否则会出现内存泄漏。

不可能为给定的问题制定绝对安全的解决方案。原因是行为主题具有value 属性,即使未订阅该主题也必须始终更新该属性,因此当每个人都取消订阅subject 时,您不能自动取消订阅observable

cartant's 解决方案也不是完美的,因为结果不是instanceof BehaviorSubject 并且shareReplay 仅在订阅时记录值。

【讨论】:

以上是关于RxJS 5,将 observable 转换为 BehaviorSubject(?)的主要内容,如果未能解决你的问题,请参考以下文章

将 Promise 转换为 RxJs Observable

RxJs - 将Observables数组转换为发射值数组

在 AngularFire 中检索嵌套 Firestore 查询的 RxJS Observable

将Observable的Observable转换为简单的Observable

rxjs 中 observable 的类型

rxjs 中 observable 的类型