RxJS:使用 NestJS HttpService 对分页 API 进行并行 http 调用
Posted
技术标签:
【中文标题】RxJS:使用 NestJS HttpService 对分页 API 进行并行 http 调用【英文标题】:RxJS: Parallel http call to paginated API using NestJS HttpService 【发布时间】:2021-02-06 07:06:57 【问题描述】:我正在使用 NestJS,这是我当前的实现并行 http 请求:
@Injectable()
export class AppService
constructor(private readonly http: HttpService)
private fetchData(index: number)
Logger.log(`Collect index: $index`);
return this.http
.get<Passenger>(
`https://api.instantwebtools.net/v1/passenger?page=$index&size=100`,
validateStatus: null ,
)
.pipe(concatMap(response => of(response.data)));
async getAllData()
let a = 0;
const collect: Passenger[] = [];
const $subject = new BehaviorSubject(a);
await $subject
.pipe(
flatMap(index =>
forkJoin([
this.fetchData(index),
this.fetchData(index + 1),
this.fetchData(index + 2),
]).pipe(mergeAll()),
),
tap(data =>
collect.push(data);
if (data?.data?.length === 0)
$subject.complete(); // stop the stream
else
a += 3; // increment by 3, because the request is 3 times at a time
$subject.next(a);
),
)
.toPromise();
return collect;
此服务用于收集第三方数据。至于现在,fetchData()
函数根据我一次想要多少并行请求被多次调用。我使用虚拟 API 进行测试,但在实际场景中,API 端点大小限制为 100,并且它不返回有关 totalPage 多少的元信息。它只是在到达最后一页时返回空数据。
目标是发出并行请求并在最后合并结果。我这样做是为了尽可能缩短请求时间,并且因为 API 本身的速率限制为每秒 50 个请求。如何优化这段代码?
【问题讨论】:
如果您不知道有多少页,那么如何使请求并行? API端点返回的数据为空时停止请求 【参考方案1】:要一次性获取所有页面,您可以使用expand
递归订阅获取某些页面的可观察对象。当您收到的最后一页为空时,通过返回EMPTY
来结束递归。
function fetchAllPages(batchSize: number = 3): Observable<any[][]>
let index = 0;
return fetchPages(index, batchSize).pipe(
// if the last page isn't empty fetch the next pages, otherwise end the recursion
expand(pages => pages[pages.length - 1].length > 0
? fetchPages((index += batchSize), batchSize)
: EMPTY
),
// accumulate all pages in one array, filter out any trailing empty pages
reduce((acc, curr) => acc.concat(curr.filter(page => page.length)), [])
);
// fetch a given number of pages starting from 'index' as parallel requests
function fetchPages(index: number, numberOfPages: number): Observable<any[][]>
const requests = Array.from( length: numberOfPages , (_, i) =>
fetchData(index + i)
);
return forkJoin(requests);
https://stackblitz.com/edit/rxjs-vkad5h?file=index.ts
这显然会在最后一批 if(totalNumberOfPages + 1) % batchSize != 0
中发送一些不必要的请求。
【讨论】:
以上是关于RxJS:使用 NestJS HttpService 对分页 API 进行并行 http 调用的主要内容,如果未能解决你的问题,请参考以下文章