RxJS:使用 NestJS HttpService 对分页 API 进行并行 http 调用

Posted

技术标签:

【中文标题】RxJS:使用 NestJS HttpService 对分页 API 进行并行 http 调用【英文标题】:RxJS: Parallel http call to paginated API using NestJS HttpService 【发布时间】:2021-02-06 07:06:57 【问题描述】:

我正在使用 NestJS,这是我当前的实现并行 http 请求:

@Injectable()
export class AppService 
  constructor(private readonly http: HttpService) 

  private fetchData(index: number) 
    Logger.log(`Collect index: $index`);

    return this.http
      .get<Passenger>(
        `https://api.instantwebtools.net/v1/passenger?page=$index&size=100`,
         validateStatus: null ,
      )
      .pipe(concatMap(response => of(response.data)));
  

  async getAllData() 
    let a = 0;
    const collect: Passenger[] = [];
    const $subject = new BehaviorSubject(a);

    await $subject
      .pipe(
        flatMap(index =>
          forkJoin([
            this.fetchData(index),
            this.fetchData(index + 1),
            this.fetchData(index + 2),
          ]).pipe(mergeAll()),
        ),
        tap(data => 
          collect.push(data);

          if (data?.data?.length === 0) 
            $subject.complete();     // stop the stream
           else 
            a += 3;     // increment by 3, because the request is 3 times at a time
            $subject.next(a);
          
        ),
      )
      .toPromise();

    return collect;
  


此服务用于收集第三方数据。至于现在,fetchData()函数根据我一次想要多少并行请求被多次调用。我使用虚拟 API 进行测试,但在实际场景中,API 端点大小限制为 100,并且它不返回有关 totalPage 多少的元信息。它只是在到达最后一页时返回空数据。

目标是发出并行请求并在最后合并结果。我这样做是为了尽可能缩短请求时间,并且因为 API 本身的速率限制为每秒 50 个请求。如何优化这段代码?

【问题讨论】:

如果您不知道有多少页,那么如何使请求并行? API端点返回的数据为空时停止请求 【参考方案1】:

要一次性获取所有页面,您可以使用expand 递归订阅获取某些页面的可观察对象。当您收到的最后一页为空时,通过返回EMPTY 来结束递归。

function fetchAllPages(batchSize: number = 3): Observable<any[][]> 
  let index = 0;
  return fetchPages(index, batchSize).pipe(
    // if the last page isn't empty fetch the next pages, otherwise end the recursion
    expand(pages => pages[pages.length - 1].length > 0 
      ? fetchPages((index += batchSize), batchSize) 
      : EMPTY
    ),
    // accumulate all pages in one array, filter out any trailing empty pages
    reduce((acc, curr) => acc.concat(curr.filter(page => page.length)), [])
  );


// fetch a given number of pages starting from 'index' as parallel requests
function fetchPages(index: number, numberOfPages: number): Observable<any[][]> 
  const requests = Array.from( length: numberOfPages , (_, i) =>
    fetchData(index + i)
  );
  return forkJoin(requests);

https://stackblitz.com/edit/rxjs-vkad5h?file=index.ts

这显然会在最后一批 if(totalNumberOfPages + 1) % batchSize != 0 中发送一些不必要的请求。

【讨论】:

以上是关于RxJS:使用 NestJS HttpService 对分页 API 进行并行 http 调用的主要内容,如果未能解决你的问题,请参考以下文章

TypeError: rxjs_1.lastValueFrom 不是函数

Nestjs 使用 axios

RxJS 等待订阅 Observable 完成

如何使用 Jest 测试 Rxjs 空操作符

Angular/Rxjs 管道异步不适用于 s-s-r?

Nest.js 全局守卫