长寿命的多播 Observable,每次订阅时都会重新订阅其源

Posted

技术标签:

【中文标题】长寿命的多播 Observable,每次订阅时都会重新订阅其源【英文标题】:Long-lived multicasting Observable that resubscribes to its source every time it is subscribed to 【发布时间】:2019-12-29 01:59:22 【问题描述】:

我想使用现有的 rxjs 操作符来创建一个多播 observable,每次订阅它时都会重新订阅它的源,但在源完成时不会取消订阅它的订阅者。

目的是混合使用共享同一数据源的短期组件和长期组件。当创建新的短期组件时,应该为新组件和任何同时订阅数据源的长期组件更新共享数据。

使用 Observable.create(),我能够创建一个自定义的 observable 来产生这种行为,但感觉可能有一个开箱即用的解决方案,不需要编写自定义代码。

这是我尝试过的。

import  of, Observable, Observer, zip, interval, merge  from "rxjs";
import  filter, map, publish, tap, delay, shareReplay, share  from "rxjs/operators";

class MySubject 
  constructor(private observable: Observable<any>) 

  sourceActive = false;
  subscribers: Array<Observer<any>> = [];

  public subscribe(observer: Observer<any>) 
    this.subscribers.push(observer);

    if (!this.sourceActive) 
      console.log("subscribing");
      this.sourceActive = true;
      this.observable.subscribe(
        x => this.subscribers.forEach(sub => sub.closed || sub.next(x)),
        x => this.subscribers.forEach(sub => sub.closed || sub.error(x)),
        () => this.sourceActive = false
      );
    
  


const source$ = of(1).pipe(
  tap(x=>console.log("invoked cold")),
  delay(2000)
);
const mySubject = new MySubject(source$);
const super$ = Observable.create(observer => mySubject.subscribe(observer));

const sub1 = super$.subscribe(x => console.log("sub 1"));
const sub2 = super$.subscribe(x => console.log("sub 2"));

setTimeout(x => 
  sub1.unsubscribe();
  super$.subscribe(x => console.log("sub 3"));
, 3000);

【问题讨论】:

【参考方案1】:

如果您想在多个组件之间共享数据,您必须使用主题。 Subject 将更改发送给所有连接的订阅者。

BehaviorSubject 具有由 observable 执行的当前值的概念,并且可以被新订阅者检索。

也许您也必须观看 ReplaySubject。

希望对你有用。

【讨论】:

以上是关于长寿命的多播 Observable,每次订阅时都会重新订阅其源的主要内容,如果未能解决你的问题,请参考以下文章

构建你的长寿命的API第1部分:规范驱动的API开发

Aeron - 跨交换机的多播问题

Spring Boot 中的多播 Websocket

RXJS Observable的冷,热和Subject

每次代码更改时都会重新生成 DataBindingInfo.java

RxJS主题(Subject)