具有多个订阅者的 Angular 2 Observable

Posted

技术标签:

【中文标题】具有多个订阅者的 Angular 2 Observable【英文标题】:Angular 2 Observable with multiple subscribers 【发布时间】:2017-01-30 08:15:27 【问题描述】:

我有一个从 API 获取数据的 Angular 2 服务 该服务有 3 个订阅者(在组件中定义),每个订阅者都对数据做其他事情(不同的图表)

我注意到我正在向 API 发出三个 GET 请求,而我想要实现的是一个请求并且订阅者将共享数据 我已经研究了 HOT 和 COLD 可观察对象并尝试了可观察对象上的 .share() 但我仍在进行 3 次单独调用

更新,添加代码

服务

import  Injectable  from '@angular/core';
import  Http, Response  from '@angular/http';

import Observable from 'rxjs/Rx';

// Import RxJs required methods
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/catch';

import  StationCompliance  from './model/StationCompliance';


@Injectable()
export class StationComplianceService 

  private url = '/api/read/stations';

  constructor(private http : Http) 
    console.log('Started Station compliance service');
   

   getStationCompliance() : Observable<StationCompliance []> 
     return this.http.get(this.url)
      .map((res:Response) => res.json())
      .catch((error:any) => Observable.throw(error.json().error || 'Server Error'));
   

组件 1

import  Component, OnInit  from '@angular/core';
import  CHART_DIRECTIVES  from 'angular2-highcharts';

import  StationComplianceService  from '../station-compliance.service';


@Component(
  selector: 'app-up-down-graph',
  templateUrl: './up-down-graph.component.html',
  styleUrls: ['./up-down-graph.component.css']
)
export class UpDownGraphComponent implements OnInit 

  graphData;

  errorMessage: string;

  options;

  constructor(private stationService : StationComplianceService)  

  ngOnInit() 
    this.getStationTypes();
  

  getStationTypes()
    this.stationService.getStationCompliance()
      .subscribe(
        graphData => 
          this.graphData = graphData;
          this.options = 
            chart : type: 'pie',
                    plotShadow: true
            ,
            plotOptions : 
              showInLegend: true
            ,
            title : text: 'Up and Down devices',
            series: [
              data: this.processStationType(this.graphData)
            ]
          
        ,
        error => this.errorMessage = <any>error
      );
  

其他两个组件几乎相同,它们只是显示其他图形

【问题讨论】:

【参考方案1】:

我遇到了类似的问题,并使用 Aran 的建议解决了它,参考 Cory Rylan 的 Angular 2 Observable Data Services 博客文章。我的关键是使用BehaviorSubject。这是最终对我有用的代码的 sn-ps。

数据服务:

数据服务在初始化服务时会创建一个内部BehaviorSubject 来缓存一次数据。消费者使用subscribeToDataService() 方法访问数据。

    import  Injectable  from '@angular/core';
    import  Http, Response  from '@angular/http';

    import  BehaviorSubject  from 'rxjs/BehaviorSubject';
    import  Observable  from 'rxjs/Observable';

    import  Data  from './data';
    import  properties  from '../../properties';

    @Injectable()
    export class DataService 
      allData: Data[] = new Array<Data>();
      allData$: BehaviorSubject<Data[]>;

      constructor(private http: Http) 
        this.initializeDataService();
      

      initializeDataService() 
        if (!this.allData$) 
          this.allData$ = <BehaviorSubject<Data[]>> new BehaviorSubject(new Array<Data>());

          this.http.get(properties.DATA_API)
            .map(this.extractData)
            .catch(this.handleError)
            .subscribe(
              allData => 
                this.allData = allData;
                this.allData$.next(allData);
              ,
              error => console.log("Error subscribing to DataService: " + error)
            );
        
      

      subscribeToDataService(): Observable<Data[]> 
        return this.allData$.asObservable();
      

      // other methods have been omitted

    
零件:

组件可以在初始化时订阅数据服务。

    export class TestComponent implements OnInit 
      allData$: Observable<Data[]>;

      constructor(private dataService: DataService) 
      

      ngOnInit() 
        this.allData$ = this.dataService.subscribeToDataService();
      

    
组件模板:

然后,模板可以根据需要使用异步管道迭代可观察对象。

    *ngFor="let data of allData$ | async" 

每次在数据服务中对BehaviorSubject 调用next() 方法时都会更新订阅者。

【讨论】:

我为这个“问题”而烦恼,这篇文章让我很开心。谢谢 请考虑更短的解决方案:***.com/questions/39627396/… 你没有解决 Zeliboba 的问题。行为主体不同 只是出于好奇,这里需要“allData:Data[]”数组吗?因为除了在 DataService 的本地范围内,我没有看到它的任何用途。 @Siddhant,这似乎是一个“重读数据提供程序”,它提供了将allData[] 公开为对已检索到的缓存数据的立即访问的选项。更精细的设计可能有一个可重用的“缓存数据提供者”扩展/装饰它。不过不要难过,就在上周,我对同样的事情感到困惑——我们都在学习。【参考方案2】:

代码中的问题是每次调用函数时都会返回一个新的 observable。这是因为http.get 每次被调用时都会创建一个新的 Observable。解决这个问题的方法可能是将可观察对象(通过闭包)存储在服务中,这将确保所有主题都订阅相同的可观察对象。这不是完美的代码,但我遇到了类似的问题,这暂时解决了我的问题。

import  Injectable  from '@angular/core';
import  Http, Response  from '@angular/http';

import Observable from 'rxjs/Rx';

// Import RxJs required methods
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/catch';

import  StationCompliance  from './model/StationCompliance';


@Injectable()
export class StationComplianceService 

  private url = '/api/read/stations';

  constructor(private http : Http) 
    console.log('Started Station compliance service');
   

   private stationComplianceObservable: Rx.Observable<StationCompliance[]>;


   getStationCompliance() : Observable<StationCompliance []> 

    if(this.stationComplianceObservable)
        return this.stationComplianceObservable;
            

      this.stationComplianceObservable = this.http.get(this.url)
      .debounce(1000)
      .share()
      .map((res:Response) => res.json())
      .finally(function ()  this.stationComplianceObservable = null) 
      .catch((error:any) => Observable.throw(error.json().error || 'Server Error'));

    return this.stationComplianceObservable;
   

【讨论】:

这个答案肯定有好处,谢谢。但是,我认为您应该解释您对.share() 的使用,因为这对于避免重复调用的问题至关重要。如果您想避免对同一个 API 端点的异步调用,答案很好。然而,接受的答案可以提供比后端调用的持续时间更长的缓存响应。 是的,这不是一个很好的解决方案。接受的答案要好得多。 @RaySuelzer 绝对不会同意你的观点,因为在服务中拥有可观察订阅是反模式。这应该是被接受的答案。【参考方案3】:

shareReplay 现在应该这样做 - "(...) valuable in situations where you know you will have late subscribers to a stream that need access to previously emitted values."

【讨论】:

【参考方案4】:

您可以创建一个响应式数据服务并定义一个本地 Observable 变量,该变量在内部更新,订阅者可以自行更新。 这篇文章正确地解释了它 data services

【讨论】:

【参考方案5】:

我知道这个帖子很旧,但接受的答案对我有很大帮助,我想使用 debounce、switchmap 和 hacky 全局通知系统添加一些可能的改进(要做到这一点,应该使用适当的 ngrx:https://ngrx.io/ )

该概念的前提是通知服务可用于将更改推送到所有其他服务,告诉他们获取数据:

export class NotifyerService 

  constructor()  

  notifyer: Subject<any> = new Subject

  notifyAll() 
    console.log("ALL NOTIFIED")
    this.notifyer.next("GO")
  


使用主题,因为在主题上调用 .next(val) 会将数据推送到所有侦听器

在特定组件的服务中(在您的情况下为“DataService”),您可以管理数据采集和缓存活动:

export class GetDataService 

  // cache the incoming data
  cache: Subject<any> = new Subject

  constructor(private http: HttpClient,
    private notifyerService: NotifyerService) 

    // subscribe to any notification from central message broker
    this.notifyerService.notifyer.pipe(

      // filtering can be used to perform different actions based on different notifications
      filter(val => val == "GO"),

      // prevent notification spam by debouncing
      debounceTime(300),

      // SUBSCRIBE to the output of getData, cancelling previous subscription
      switchMap(() => this.getData())

    ).subscribe(res => 

      console.log("expensive server request")

      // save data in cache, notify listeners
      this.cache.next(res)
    )

  

  // expensive function which gets data
  getData(): Observable<any> 
    return this.http.get<any>(BASE_URL);
  

 

上面代码中的关键概念是设置一个缓存对象,并在有通知时更新它。在构造函数中,我们希望通过一系列操作符来传递所有未来的通知:

如果您只想更新缓存,则可以使用过滤,如果 通知服务输出某些内容(在本例中为“GO”)。如果 你开始这样做,几乎肯定会使用 ngrx。 debounceTime 可防止您的服务器收到许多请求的垃圾邮件(如果说,通知是基于用户输入的) switchMap 是一个非常重要的缓存和状态改变算子。 switchMap 在内部运行函数(在本例中为 getData 函数)并订阅其输出。这意味着 switchMap 的输出将是对服务器数据的订阅。 switchMap 的另一个重要功能是取消以前的订阅。这意味着如果您的服务器请求在另一个通知到达之前没有返回,它将取消旧的请求并再次 ping 服务器。

现在,服务器数据将通过 .next(res) 方法放入缓存中。

现在所有最终组件需要做的就是监听缓存的更新,并适当地处理:

export class ButtonclickerComponent implements OnInit 

  value: any;

  constructor(private getDataService: GetDataService,
    private notifyerService: NotifyerService)  

  ngOnInit() 

    // listen to cache for updates
    this.getDataService.cache.pipe(

      // can do something specific to this component if have multiple subscriptions off same cache
      map(x => x)

      // subsc
    ).subscribe(x =>  console.log(x); this.value = x.value )

  

  onClick() 

    // notify all components of state changes
    this.notifyerService.notifyAll()
  


实践中的概念:

Angular App on button click

Server response

【讨论】:

【参考方案6】:

解决方案在创建后保存可观察并使其可共享(默认情况下不是)。所以你的服务看起来像:

@Injectable()
export class StationComplianceService 

  private stationCompliance: StationCompliance;
  private stream: Observable<StationCompliance []>;
  private url = '/api/read/stations';

  constructor(private http : Http) 
    console.log('Started Station compliance service');
   

   getStationCompliance() : Observable<StationCompliance []> 
     /** is remote value is already fetched, just return it as Observable */
     if (this.stationComliance) 
       return Observable.of(this.stationComliance);
     
     /** otherwise if stream already created, prevent another stream creation (exactly your question */
     if (this.stream) 
       return this.stream;
     
     /** otherwise run remote data fetching */
     this.stream = this.http.get(this.url)
      .map((res:Response) => res.json())
      .catch((error:any) => Observable.throw(error.json().error || 'Server Error'))
      .share(); /** and make the stream shareable (by default it is not) */
     return this.stream;
   

【讨论】:

在第二种情况下,流已经创建。如果我再次返回它,我会得到以前发出的 http 请求吗?我认为它不会返回任何东西 当然,它不会再次返回已经发出的事件。如果您需要它们,我建议使用累加器减少流。

以上是关于具有多个订阅者的 Angular 2 Observable的主要内容,如果未能解决你的问题,请参考以下文章

订阅多个字段的 valueChanges 导致反应形式 angular 2 的性能问题

Angular / RxJs对一个observable的多个订阅

Angular 2:许多异步管道与一个订阅

Angular 2 从 RxJs 订阅返回数据

Angular - 竞争条件 - 一次多个订阅

如何在 Angular 和 RxJs 中管理多个顺序订阅方法?