长寿命的多播 Observable,每次订阅时都会重新订阅其源
Posted
技术标签:
【中文标题】长寿命的多播 Observable,每次订阅时都会重新订阅其源【英文标题】:Long-lived multicasting Observable that resubscribes to its source every time it is subscribed to 【发布时间】:2019-12-29 01:59:22 【问题描述】:我想使用现有的 rxjs 操作符来创建一个多播 observable,每次订阅它时都会重新订阅它的源,但在源完成时不会取消订阅它的订阅者。
目的是混合使用共享同一数据源的短期组件和长期组件。当创建新的短期组件时,应该为新组件和任何同时订阅数据源的长期组件更新共享数据。
使用 Observable.create(),我能够创建一个自定义的 observable 来产生这种行为,但感觉可能有一个开箱即用的解决方案,不需要编写自定义代码。
这是我尝试过的。
import of, Observable, Observer, zip, interval, merge from "rxjs";
import filter, map, publish, tap, delay, shareReplay, share from "rxjs/operators";
class MySubject
constructor(private observable: Observable<any>)
sourceActive = false;
subscribers: Array<Observer<any>> = [];
public subscribe(observer: Observer<any>)
this.subscribers.push(observer);
if (!this.sourceActive)
console.log("subscribing");
this.sourceActive = true;
this.observable.subscribe(
x => this.subscribers.forEach(sub => sub.closed || sub.next(x)),
x => this.subscribers.forEach(sub => sub.closed || sub.error(x)),
() => this.sourceActive = false
);
const source$ = of(1).pipe(
tap(x=>console.log("invoked cold")),
delay(2000)
);
const mySubject = new MySubject(source$);
const super$ = Observable.create(observer => mySubject.subscribe(observer));
const sub1 = super$.subscribe(x => console.log("sub 1"));
const sub2 = super$.subscribe(x => console.log("sub 2"));
setTimeout(x =>
sub1.unsubscribe();
super$.subscribe(x => console.log("sub 3"));
, 3000);
【问题讨论】:
【参考方案1】:如果您想在多个组件之间共享数据,您必须使用主题。 Subject 将更改发送给所有连接的订阅者。
或
BehaviorSubject 具有由 observable 执行的当前值的概念,并且可以被新订阅者检索。
也许您也必须观看 ReplaySubject。
希望对你有用。
【讨论】:
以上是关于长寿命的多播 Observable,每次订阅时都会重新订阅其源的主要内容,如果未能解决你的问题,请参考以下文章