如何使用 RxJS 以时间间隔调用多个依赖 api 调用

Posted

技术标签:

【中文标题】如何使用 RxJS 以时间间隔调用多个依赖 api 调用【英文标题】:How to call multiple dependent api calls with time intervals using RxJS 【发布时间】:2021-12-30 07:05:01 【问题描述】:

我正在尝试为这样的场景编写 angular 11 的代码 -

我有文件列表,对于我点击 api(比如 api1)的每个文件,我从响应中获取一个 fileId,然后将它传递给另一个 api(比如 api2),我想继续每 3 次点击 api2秒,除非我在响应中没有得到 status="available"。一旦我获得了可用状态,我不再需要为该 fileId 访问 api2,我们可以开始处理循环中的下一个文件。

我拥有的每个文件的整个过程。

我知道我们可以使用 rxjs 操作符(如 mergeMap 或 switchMap)来实现这一点(因为序列现在对我来说并不重要)。但我对 rxjs 很陌生,不知道如何组合起来。

这就是我现在正在做的-

this.filesToUpload.forEach((fileItem) => 
      if (!fileItem.uploaded) 
        if (fileItem.file.size < this.maxSize) 
          self.fileService.translateFile(fileItem.file).then( //hit api1
            (response) => 
              if (response && get(response, 'status') == 'processing') 
               //do some processing here 
               this.getDocumentStatus(response.fileId);
               
            ,
            (error) => 
              //show error
            
          );
        
      
   ); 
getDocumentStatus(fileId:string)
    this.docStatusSubscription = interval(3000)   //hitting api2 for every 3 seconds 
    .pipe(takeWhile(() => !this.statusProcessing))
    .subscribe(() => 
      this.statusProcessing = false;
      this.fileService.getDocumentStatus(fileId).then((response)=>
        if(response.results.status=="available")
          this.statusProcessing = true;
          //action complete for this fileId
        
      ,(error)=>

      );
      
    )
    
  

【问题讨论】:

【参考方案1】:

根据您所追求的描述,我可能会这样做。

    为您要进行的所有调用创建一个可观察对象列表。 将列表连接在一起 订阅

这项工作的原因是我们只订阅一次(不是每个文件一次),我们让操作员处理其他所有内容的订阅和取消订阅。

然后在我们订阅之前什么都不会发生。这样concat 可以为我们完成繁重的工作。没有必要使用this.statusProessing 或类似的变量来跟踪任何东西。这一切都为我们处理好了!这样更不容易出错。

// Create callList. This is an array of observables that each hit the APIs and only
// complete when status == "available".
const callList = this.filesToUpload
  .filter(fileItem => !fileItem.uploaded && fileItem.file.size < this.maxSize)
  .map(fileItem => this.createCall(fileItem));

// concatenate the array of observables by running each one after the previous one
// completes.
concat(...callList).subscribe(
  complete: () => console.log("All files have completed"),
  error: err => console.log("Aborted call list due to error,", err)
);
createCall(fileItem: FileItemType): Observable<never>
  // Use defer to turn a promise into an observable 
  return defer(() => this.fileService.translateFile(fileItem.file)).pipe(

    // If processing, then wait untill available, otherwise just complete
    switchMap(translateFileResponse => 
      if (translateFileResponse && get(translateFileResponse, 'status') == 'processing') 
        //do some processing here 
        return this.delayByDocumentStatus(translateFileResponse.fileId);
       else 
        return EMPTY;
      
    ),
    // Catch and then rethrow error. Right now this doesn't do anything, but If 
    // you handle this error here, you won't abort the entire call list below on 
    // an error. Depends on the behaviour you're after.
    catchError(error => 
      // show error
      return throwError(() => error);
    )

  );

delayByDocumentStatus(fileId:string): Observable<never>
  // Hit getDocumentStatus every 3 seconds, unless it takes more
  // than 3 seconds for api to return response, then wait 6 or 9 (etc)
  // seconds.
  return interval(3000).pipe(
    exhaustMap(_ => this.fileService.getDocumentStatus(fileId)),
    takeWhile(res => res.results.status != "available"),
    ignoreElements(),
    tap(
      complete: () => console.log("action complete for this fileId: ", fileId)
    )
  );

【讨论】:

这正是我正在寻找的行为。谢谢! @Mrk Sef 如果我的第一个文件需要更多时间来回复响应,它不会移动到下一个文件进行翻译,因此列表中的所有其他文件都会被阻止,直到前一个文件没有'完成了。我们怎样才能防止这种情况发生? @Abhijeeta 听起来您不想集中您的通话清单。 (尽管现在您的问题非常令人困惑,因为这听起来很像您想要连接您的 callList)。无论如何,您可以使用merge 而不是concat 是的合并工作!

以上是关于如何使用 RxJS 以时间间隔调用多个依赖 api 调用的主要内容,如果未能解决你的问题,请参考以下文章

如何在 angular2 中进行连续的 http api 调用?

使用 RXJS 继续以角度触发 http 调用,直到满足条件

如何在 RxJS 中停止 Observable 的间隔

如何在 Angular 和 RxJs 中管理多个顺序订阅方法?

在rxjs6中以随机间隔生成值流?

在多个函数中重用一个rxjs'主题