如何使用NodeJs中的rxjs中的forkjoin进行多次调用,使用NPM请求

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用NodeJs中的rxjs中的forkjoin进行多次调用,使用NPM请求相关的知识,希望对你有一定的参考价值。

我必须使用request module和RxJs Observable在NodeJs中在服务器端进行多个序列化调用

我有像''www.google.com','facebook.com'这样的Url地址数组。当我发出请求呼叫时,服务器在事件循环中忙碌。我想知道我的最后一个url调用何时获得响应,以便我可以向前端发送响应,这是我的代码到目前为止

 const foo = Observable.create(function (observer) {
         res.writeHeader(200, { "Content-Type": "text/html" });
         var str = '<html><head></head><body><h1> Following are the responses: </h1><ul>'
         res.write(str);
         let count = 0;
         addressesArray.forEach(function (element, i) {

         observer.next(element);
         if (some Condition) {
            console.log('here i want to call complete when I get last response')     
       // observer.complete();   
         }


         })
     }) 

这是我的订阅功能

     const subscription = foo.subscribe({
         next : addres =>
         {
             request(addres , function (error, response, body) {
                if (!error && response.statusCode == 200) {

                     console.log(response)


                 }
                if (typeof response === "undefined" || typeof body === "undefined") {
                     console.log('No resp')
                 }
            })

     },
     complete:()=>{console.log('done')}


 })

我怎么能用Forkjoin

答案

您可以将每个URL转换为Observable,然后使用forkJoin等待所有URL解析:

import * as request from 'request';
import { bindNodeCallback, forkJoin } from 'rxjs';

const requestAsObservable = bindNodeCallback(request.get);
const observables = addressesArray.map(url => requestAsObservable(url));

forkJoin(...observables)
  .subscribe(console.log);

你也可以request-promise包只返回Promises你不必使用bindNodeCallback来包装request

以上是关于如何使用NodeJs中的rxjs中的forkjoin进行多次调用,使用NPM请求的主要内容,如果未能解决你的问题,请参考以下文章

如何展平 rxjs 中的嵌套数组

角度服务中的 EventEmitter 或 RxJS 主题

在从 HttpClient 请求获得的对象转换新类型之前,如何使用 rxjs 映射来改变对象中的数据?

如何提供一个Rxjs observable作为使用Jasmine中的combineLatest的方法的数据

如何在 rxjs 中的订阅中重构订阅

如何使用jasmine-marbles测试rxjs管道中的timeout()