RxJS 5,将 observable 转换为 BehaviorSubject(?)
Posted
技术标签:
【中文标题】RxJS 5,将 observable 转换为 BehaviorSubject(?)【英文标题】:RxJS 5, converting an observable to a BehaviorSubject(?) 【发布时间】:2018-02-22 18:12:07 【问题描述】:我有一个父 observable,一旦它有一个订阅者,它将进行查找并发出一个值,然后完成。
我想将其转换为执行以下操作的可观察对象(或行为主题或任何工作):一旦它至少有一个订阅者,它就会从父可观察对象获得结果(一次)。然后它将该值发送给它的所有订阅者,并在订阅时向所有未来的订阅者发送该值。即使它的订阅者数量下降到零,它也应该继续这种行为。
看起来这应该很容易。以下是无效的:
theValue$: Observable<boolean> = parent$
.take(1)
.share()
其他不起作用的东西:publishReplay()
、publish()
。效果更好的东西:
theValue$ = new BehaviorSubject<boolean>(false);
parent$
.take(1)
.subscribe( value => theValue$.next(value));
不过,这种方法存在一个问题:parent$
在theValue$
获得第一个订阅者之前就被订阅了。
有没有更好的方法来处理这个问题?
【问题讨论】:
【参考方案1】:shareReplay
应该做你想做的事:
import 'rxjs/add/operator/shareReplay';
...
theValue$: Observable<boolean> = parent$.shareReplay(1);
shareReplay
在 RxJS 版本 5.4.0 中添加。它返回一个引用计数的 observable,它将在第一次订阅时订阅源 - parent$
。源完成后进行的订阅将收到重播通知。
shareReplay
- 和一般的refCount
- 在我最近写的一篇文章中有更详细的解释:RxJS: How to Use refCount。
【讨论】:
这似乎有效!我会玩一会儿,以确保没有意外。这看起来很棒——我只希望他们在创建文档时将其添加到文档中。我不喜欢感觉自己正在使用可能会消失的未记录功能。但我认为它已添加到更改日志中,所以我猜它会保留下来。 它会一直存在。这些文档正在进行中。无论如何,它非常接近publishReplay(1).refCount()
- 差异很微妙。
在阅读了您非常详尽的博文后,我认为我在这里真正想要的是publishLast().refCount
,尽管在实践中,它们在我的情况下可能在功能上几乎相同。再次感谢!
不幸的是,这不允许使用 BehaviorSubject.value 属性,因为此操作的结果仍然只是一个 Observable。
@Celestis 使用主题的value
属性通常被认为是代码异味。我当然认为这是不好的做法。【参考方案2】:
我已经实现了一种将 Observables 转换为 BehaviorSubjects 的方法,因为我认为 shareReplay 方法可读性不强以供将来参考。
import Observable from 'rxjs/Observable';
import BehaviorSubject from 'rxjs/BehaviorSubject';
export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T>
const subject = new BehaviorSubject(initValue);
observable.subscribe(
(x: T) =>
subject.next(x);
,
(err: any) =>
subject.error(err);
,
() =>
subject.complete();
,
);
return subject;
【讨论】:
是否还没有内置的方法来做到这一点? @chrismarx 不。据我所知,开发者无意添加此功能 这太简单了!BehaviorSubject
的唯一要求确实是一个初始值,例如,如果 observable 从服务中获取,它可以是一个空数组!谢谢!
谨慎使用此解决方案,因为派生主题永远不会取消订阅原始 observable,它可能会导致内存泄漏【参考方案3】:
这是the tmuechsch's answer 的改进变体。
import Observable, BehaviorSubject from 'rxjs';
export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T>
const subject = new BehaviorSubject(initValue);
const subscription = observable.subscribe(subject);
return
subject,
stopWatching: () => subscription.unsubscribe()
;
要小心,因为返回的主题永远不会取消订阅源 observable。当您知道没有更多对 subject
的引用时(例如,当视图组件被销毁/卸载时),您需要手动调用 stopWatching
。否则会出现内存泄漏。
不可能为给定的问题制定绝对安全的解决方案。原因是行为主题具有value
属性,即使未订阅该主题也必须始终更新该属性,因此当每个人都取消订阅subject
时,您不能自动取消订阅observable
。
cartant's 解决方案也不是完美的,因为结果不是instanceof BehaviorSubject
并且shareReplay
仅在订阅时记录值。
【讨论】:
以上是关于RxJS 5,将 observable 转换为 BehaviorSubject(?)的主要内容,如果未能解决你的问题,请参考以下文章
在 AngularFire 中检索嵌套 Firestore 查询的 RxJS Observable