在执行下一个代码块之前等待多个 RxJS 调用

Posted

技术标签:

【中文标题】在执行下一个代码块之前等待多个 RxJS 调用【英文标题】:Waiting for multiple RxJS call before executing the next code block 【发布时间】:2022-01-22 21:29:43 【问题描述】:

我有一个街道列表并遍历它们。在 for 循环中,我将为每条街道调用一个端点。端点为我提供了有关所请求街道的信息。我想将每个响应存储到一个对象数组中,在所有请求完成后,我想执行下一个代码块。 这是我的问题: 我进行所有调用,将所有数据存储到对象数组中,但如果我要在下一个代码块中使用我预填充的对象数组,长度 = 0... 这是我的代码:

export class MyComponent
  addressInfoArray: AddressInfo[] = [];

  // some code ...

  prepareStreetInformations(): void 
    // some code ....

    this.fillArray(streets, url);
    this.doSomethingWithArray(this.addressInfoArray); // <--- length = 0 and doesn't waits for finishing the fillArray() method
  


fillArray(streets: Street[], url: string): void  // streets has a length of 150
  for (const street of streets) 
    this.http.get<AddressInfo>(`$url/street.name`).subscribe(response => 
      this.addressInfoArray.push(response);
    );
  

所以我的问题是:如何等待 doSomethingWithArray() 方法完全完成 fillArray() 方法,为什么 doSomethingWithArray() 方法看不到我的对象数组已全部填充?

【问题讨论】:

这能回答你的问题吗? How to 'wait' for two observables in RxJS 【参考方案1】:

使用 RxJS,您不会真的试图强制函数调用“等待”前一个函数;相反,您可以构建一个 observable,根据其他 observables 发出您需要的数据。

在您的情况下,您似乎需要一个可发出 AddressInfo 数组的 observable。

我们可以定义一个返回Observable&lt;Street[]&gt;getStreets() 方法,以及一个接受street 参数并返回Observable&lt;AddressInfo&gt;getAddressInfo() 方法。

现在,我们可以使用 switchMapforkJoin 创建一个可发出 AddressInfo[] 的 observable:

1  export class MyComponent 
2  
3    getStreets(): Observable<Street[]> 
4      this.http.get<Street[]>(...);
5    
6  
7    getAddressInfo(street: Street): Observable<AddressInfo> 
8      this.http.get<AddressInfo>(`$this.url/$street.name`); // don't subscribe here
9    
10
11   addressInfos$: Observable<AddressInfo[]> = this.getStreets().pipe(
12     switchMap(streets => forkJoin(
13       streets.map(s => this.getAddressInfo(s))
14     ))
15   );
16 
17 

我们使用forkJoin 创建一个单独的可观察对象,该可观察对象发出一个包含所有输入可观察对象结果的数组。因此,我们将一个 observable 数组作为输入传递给它。

因为您有一系列街道。在第 13 行,我们简单地将其映射到获取地址信息的 observable 数组。现在,当订阅了 forkJoined observable 时,它​​将发出一个 AddressInfo 数组(所有单个 http 调用的结果

我们使用switchMap 为我们处理订阅这个“forkjoined observable”。结果是一个发出AddressInfo[]addressInfos$ observable。

请注意,我们尚未订阅。要完成您的工作,您只需订阅:

addressInfos$.subscibe(
  infos => doSomethingWithArray(infos)
);

但是,在 Angular 中,一种典型的方式是将数据的发射进一步转换为模板所需的形状:

templateData$ = this.addressInfos$.pipe(
  map(infos => 
    //  do something will array :-)
  )
);

然后,在您的模板中,您可以利用 async 管道:

<div *ngIf="templateData$ | async as data">
  <ul>
    <li *ngFor="item of data"> item </li>
  </ul>
</div>

【讨论】:

【参考方案2】:

尝试使用 forkJoin ;

const streetsObservs = this.streets.map(street => this.http.get<AddressInfo>(`$url/street.name`));
forkJoin(streetsObservs).subscribe((response: any) => 
  this.addressInfoArray = response; // this is an array
  this.doSomethingWithArray(this.addressInfoArray); // <--- length = 0 and doesn't waits for finishing the fillArray() method

);

【讨论】:

我的问题是我想首先将所有数据存储到我的数组中。 AddressInfo 有一些用于计算的数据。所以我首先需要所有数据,然后我可以进行下一步...【参考方案3】:

我认为您对 Observables 的工作原理有一个错误的想法。在您的示例代码中,执行 fillArray 只是调度 http 调用,然后函数结束。代码中的下一步是调用doSomethingWithArray,当然此时addressInfoArray 未被填充。然后,一段时间后,当 http 调用完成时,您在 subscribe 中提供的回调函数将被执行,调用 addressInfoArray.push。使用subscribe 时总是如此。当 Observable 完成时,它只会执行你传递给它的函数。

如果你想等待许多 Observable 完成,你可以使用combineLatest。这将创建一个 Observable,它发出一个包含原始 Observable 中所有值的数组。它是这样工作的:

prepareStreetInformations(): void 
  this.fillArray(streets, url).subscribe(addressInfoArray => 
    this.doSomethingWithArray(this.addressInfoArray)
  );

  
fillArray(streets: Street[], url: string): Observable<AddressInfo[]> 
  return combineLatest(
    streets.map(street => this.http.get<AddressInfo>(`url/$street.name`))
  );

请注意,fillArray 现在返回一个您可以订阅的 Observable,并且不再对类属性进行任何更改。如果需要类属性,则必须在传递给subscribe 的回调函数中定义它。

【讨论】:

我的问题是我想首先将所有数据存储到我的数组中。 AddressInfo 有一些用于计算的数据。所以我首先需要所有数据,然后我可以进行下一步... 这正是这段代码的作用

以上是关于在执行下一个代码块之前等待多个 RxJS 调用的主要内容,如果未能解决你的问题,请参考以下文章

jQuery:强制执行 document.ready() 调用的顺序

在注册之前验证可观察的响应以执行多个请求(iff)-RxJs

并发编程:锁对象同步代码块

在synchronize代码块中调用wait后还会继续执行后续的代码吗

在执行下一行代码之前等待超时完成

多线程---线程间的通信