使用 RXJS 继续以角度触发 http 调用,直到满足条件

Posted

技术标签:

【中文标题】使用 RXJS 继续以角度触发 http 调用,直到满足条件【英文标题】:use RXJS to keep triggering http calls in angular until a condition is met 【发布时间】:2021-10-11 15:38:36 【问题描述】:

我正在调用 spotify api,它返回一个像这样的对象:


  next: 'https://api.spotify.com/v1/me/tracks?offset=100&limit=50'
  items: [...]

其中 items 是刚刚进行的网络调用的结果,next 是用于获取下一个“页面”结果的 url。我想做的是在没有偏移的情况下进行初始调用,然后继续进行网络调用,直到下一个为空,这意味着用户没有更多的项目可以获取。如果没有更多项目,他们将返回 null 以供下一个。

这似乎是可能的,但似乎无法弄清楚如何正确地做到这一点。我正在寻找的是这样的:

  readonly baseSpotifyUrl: string = 'https://api.spotify.com/v1';

constructor(private http: HttpClient) 


public getTracks(url?: string): Observable<any> 
    return this.http.get(url && url?.length > 0 ? url : `$this.baseSpotifyUrl/me/tracks?limit=50`);
  

public getAllTracks(): Observable<any> 
    const tracks$: Subject<any> = new Subject();
    let next: string = '';
    this.getTracks(next)
      .subscribe(
        next: (res: any): void => 
          tracks$.next(res.items);
          next = res.next;
          // if res.next !== null, do this again now that we have set next to res.next
        ,
        error: (err: any): void => 
          console.error(err);
        
      );
    return tracks$;
  

这里的想法是我的组件将调用 getAllTracks() 并接收一个主题,然后新项目将不断地通过该主题推送,直到所有项目都被检索到。仅当有更多项目要获取时,前一个请求才返回时,我似乎无法弄清楚如何发出新的网络请求(res.next!== null)

编辑------------------------------ -------------

这完成了工作,但我觉得它很垃圾:

  public getAllTracksSegment(itemsSubject: Subject<any>, nextSubject: Subject<string>, url?: string): void 
    this.http.get(url && url?.length > 0 ? url : `$this.baseSpotifyUrl/me/tracks?limit=50`).subscribe(
      next: (res: any): void => 
        itemsSubject.next(res.items);
        nextSubject.next(res.next);
      
    );
  

  public getAllTracks(): Observable<any> 
    const tracks$: Subject<any> = new Subject();
    const next$: Subject<string> = new Subject();
    next$.subscribe(
      next: (next: any): void => 
        if (next !== null) 
          this.getAllTracksSegment(tracks$, next$, next);
        
      
    );
    next$.next('');
    return tracks$;
  

【问题讨论】:

试过 takeWhile 操作符吗? ***.com/questions/67487848/… @AakashGarg http 请求 observables 在一次发射后关闭,所以 takeWhile 似乎没有为他们工作 @eko 我似乎越来越接近该问题的公认答案。我会尽快研究它,但似乎很有希望。似乎它没有返回第一组结果,但它正在进行正确数量的调用 【参考方案1】:

如果我理解正确的问题,我会使用expand 运算符来构建解决方案。

这里是我要使用的代码。评论是内联的

public getTracks(url?: string): Observable<any> 
    return this.http.get(url && url?.length > 0 ? url : `$this.baseSpotifyUrl/me/tracks?limit=50`);


public getAllTracks(): Observable<any[]> 
  // the first call is with no parameter so that the default url with no offset is used
  return getTracks().pipe(
     // expand is used to call recursively getTracks until next is null
     expand(data => data.next === null ? EMPTY : getTracks(data.next)),
     // with tap you can see the result returned by each call
     tap(data => console.log(data)),
     // if you want you can use the reduce operator to eventually emit the 
     // accumulated array with all items
     reduce((acc, val) => 
       acc = [...acc, ...val.items]
       return acc
     , [])
  )


// now you can fire the execution of the recursive calls by subscribing
// to the observable returned by getAllTracks
getAllTracks().subscribe(
   // allItems is an array containing all the items returned by the various calls
   allItems => console.log(allItems)
)

@skyleguy 之后的补充说明

tap 运算符用于实现副作用。换句话说,它接收来自上游的所有通知,对通知的数据做任何它需要做的事情,然后将相同的通知传递给下游。传递给tap 运算符的函数不需要返回任何内容。应用 副作用 后,上游只是传递到下游。在此示例中,副作用只是在控制台上打印随通知传递的数据。

pipe 中使用的reduce 是 RxJs 的 reduce 运算符,而不是 Arrayreduce 方法。 reduceRxJs 操作符会累积所有上游通知的数据,并在上游completes 时发出一个值。所以,在这个例子中,每次调用远程函数返回一些东西,这个东西进入reduce操作符,并有助于累加逻辑。当expand 返回EMPTY Observable 时,在递归结束时,EMPTY Observable 只是completes 而不通知任何东西,这意味着上游completes 和因此reduce 可以发出它的第一个也是唯一的通知,即所有项目累积的数组,然后complete

This stackblitz 通过模拟远程调用复制此逻辑。

【讨论】:

是的,这很干净。不过,我可能会从对其工作原理的轻微解释中受益。我删除了 tap 因为它实际上对我不起作用,直到它被删除之前什么都没有发生(可能是因为我没有返回数据)。我知道 expand 是如何进行所有网络调用的,但它是否返回一个要与 reduce 函数一起使用的数组?看起来很奇怪,因为当我悬停它时它说它返回:OperatorFunction<...>。还是每次expand的响应返回时都会触发reduce函数 似乎让水龙头返回数据使它工作,所以有道理。 另外,EMPTY 是如何工作的?我起初预计它会在 EMPTY observable 击中那个块时在 ...val.items 上中断,但它似乎并没有真正击中那个块。当 EMPTY 从扩展返回时,它是否有效地关闭了 observable? 我在回复中添加了一些额外的解释。我希望这些能解答你的疑惑。【参考方案2】:

虽然其他答案提到了takeWhile(),但我想将它与更简单/声明性的解决方案一起使用。还有一件事,虽然我使用了 Observable&lt;any[]&gt;(我假设 items 是一个数组),但我强烈建议为从 API 返回的对象添加类型声明。

public getAllTracks$: Observable<any[]>;
private nextUrl = new BehaviorSubject<string>('https://api.spotify.com/v1/me/tracks?limit=50');


constructor(private http: HttpClient) 
    getAllTracks$ = this.nextUrl.pipe(
        mergeMap(url => this.http.get(url)),
        takeWhile(res => !!res.next),
        tap(res => this.nextUrl.next(res.next)),
        scan((items, res) => [...items, ...res.items], [] as any[])
    );

首先,我们定义一个 BehaviorSubject,它将为每个请求发出要调用的 URL,从初始值开始。然后我们在构造函数中定义我们的 observable。不需要额外的属性或方法。

getAllTracks$ 订阅 BehaviorSubject 并立即收到第一个 url(我们在其声明中包含的字符串)。

我们使用mergeMap(),因此它将调用所有连续的 API 请求。 (如果我们使用switchMap(),从我们的 BehaviorSubject 接收到的任何新 URL 都会取消任何正在进行的 API 调用并“切换”到新的 URL 值。)我还发现这比使用 expand() 更直接。

然后我们的takeWhile() 检查我们的响应是否具有next 属性。如果是这样,我们继续使用最后两个运算符。如果没有,它会发出complete 信号,导致所有订阅者取消订阅。

tap() 最适合用于外部状态更新。因为我们有了下一个 URL,我们可以将它发送到我们的 BehaviorSubject,排队下一个 API 调用。

最后,我们使用scan() 来组合这些数据的呈现方式。 scan() 运算符类似于reduce(),但用于可观察到的排放。在第一次发出时,它声明一个新数组,并将任何后续 API 发出添加到其列表中。

【讨论】:

【参考方案3】:

虽然我会喜欢@picci's 认可的答案,但我想展示一个稍微调整的版本。这突出表明通常有多种方法可以使用 RxJS 实现相同的目标:)。

public getTracks(url?: string): Observable<any> 
    return this.http
      .get(url?.length ? url : `$this.baseSpotifyUrl/me/tracks?limit=50`)
      .pipe(
        takeWhile((data: any) => data.next !== null), // Crucial, to stop the recursive call when `next` is null
      );
  

  public getAllTracks(): Observable<any[]> 
    // the first call is with no parameter so that the default url with no offset is used
    return getTracks().pipe(
      // expand is used to call recursively getTracks until next is null
      expand(data => this.getTracks(data.next)),
      // with tap you can see the result returned by each call
      tap(data => console.log(data)),
      // if you want you can use the reduce operator to eventually emit the
      // accumulated array with all items
      reduce((acc, val) => [...acc, ...val.items], []),
    );
  

// now you can fire the execution of the recursive calls by subscribing
// to the observable returned by getAllTracks
this.getAllTracks().subscribe(
   // allItems is an array containing all the items returned by the various calls
   allItems => console.log(allItems)
)

密钥是takeWhile。一旦条件满足,它将complete。不过,这两个答案都很好用:)。

这里是改编的例子https://stackblitz.com/edit/rxjs-tcbdue?file=index.ts

【讨论】:

【参考方案4】:

我使用 RxJs 提出了这个解决方案。希望对您有帮助,如果可行,请给我反馈。

expand 执行递归搜索,takeWhile 仅在 Next 不为 null 时才控制请求。将所有曲目归为一组。订阅后,您将拥有所有可用的曲目。

  getAllTracks(): Observable<any> 
    return this.getAllTracksSegment().pipe(
      expand((res: any) => this.getAllTracksSegment(res.next)),
      takeWhile((res) => !!res.next),
      reduce((total, current) => total.concat(current.tracks), [])
    ); // All tracks available when you subscribe
  

  getAllTracksSegment(url?: string): Observable<any> 
    return this.http.get(url && url?.length > 0 ? url : `$this.baseSpotifyUrl/me/tracks?limit=50`);
  

PS:如果你想发出部分结果,只需删除 reduce 运算符即可

【讨论】:

我猜它不会以当前的形状工作,因为 1. 使用EMPTY 没有发射,所以Array 将是空的。 2. 它不能编译:你在两个函数中都缺少return 语句(+ getAllTracksSegment 不应该是无效的)并且res/current 回调被推断为unknown,所以 Typescript 不满意res.next/current.tracks。但是,是的,如果你解决了这些问题,这是解决这个问题的好方法:) 感谢您的反馈。我刚刚修好了。在这些情况下,您还可以建议对答案进行编辑。我用我正在使用的另一个代码编译,使用另一个端点。重要的是尝试使用这种逻辑来解决问题,这可能是最好的方法。 很遗憾,我不能再提出修改建议了,我的修改是自动应用的,所以我尽量避免使用它们(除非它是一个小错字、损坏的 url 等),因为有些人喜欢更正他们自己的代码。 嗯,不确定我是否做错了什么,但似乎什么也没发生。我肯定订阅了这个函数返回的 observable,但我什至没有看到任何网络请求 @skyleguy 是的,我在上面的第一条评论中提到了这个原因(关于EMPTY)。

以上是关于使用 RXJS 继续以角度触发 http 调用,直到满足条件的主要内容,如果未能解决你的问题,请参考以下文章

rxjs中多次调用的角度处理错误

RxJS有角度

Angular-RxJS- 在长 API 调用中间隐藏微调器并继续 API 调用

RxJS:使用 NestJS HttpService 对分页 API 进行并行 http 调用

Angular/RxJS 6:如何防止重复的 HTTP 请求?

RxJS - BehaviorSubject,onComplete 未调用