Angular/RxJS:带有可观察对象的嵌套服务调用

Posted

技术标签:

【中文标题】Angular/RxJS:带有可观察对象的嵌套服务调用【英文标题】:Angular/ RxJS: nested service calls with observables 【发布时间】:2019-02-19 11:13:56 【问题描述】:

我是 RxJS 可观察对象的新手,我正在尝试解决一个相当简单的用例。

在服务中,我首先进行一个返回项目(作为可观察对象)的 http 调用。该项目包含一个 id 数组,其中一些是重复的。对于每个不同的 id,我需要调用另一个 http 服务(再次返回一个 observable),并将其返回值添加到原始项目以代替相应的 id。这些调用应该并行发生。最后,每次调用完成后,我的服务应该返回原始项目的可观察对象,现在其子项目已就位。

为了给出一个更好的主意,这是使用 promises 而不是 observables 的样子:

MyService() 
    return HttpGetMainItem()
        .then(item => 
            var promises = _.uniq(item.subItems)
                 .map(sid => HttpGetSubItem(sid)
                            .then(subItem => 
                                // add to matching item.subItems
                             ));
            // wait for all promises to complete and return main item
            return Promise.all(promises).then(() => item);
        );

使用 observables 完成这项工作的最佳方法是什么?

编辑:从答案看来我不是很清楚。带有 Promise 的示例只是为了清楚起见,在我的情况下,http 调用实际上是 Angular 的 HttpClient.get,所以它们返回 observables——我希望用 observables 做所有事情。

【问题讨论】:

你试过了吗?您可以从定义用例中涉及的接口开始,例如您提到的 2 个请求的响应。 换句话说,你想链接可观察对象,对吧?检查这个:learnrxjs.io/operators/transformation/mergemap.html您可能还想搜索switchMap 嘿@udik,因为你是 Rxjs 的新手,你想开始使用 rxjs6 的新语法。一开始感觉有点笨拙,但导入更容易,而且摇树也得到了很大改善。所以不是调用 Observable.map,而是调用 Observable.pipe(map)。为了确保您没有使用旧语法,请从 package.json 中的依赖项中删除“rxjs-compat”。 【参考方案1】:

这是使用 rxjs 完成上述任务的一种方法。

    使用 from 将 promise 转换为 observable。 在外部管道中调用switchMap,以便您可以调用另一个可观察对象并将其作为外部可观察对象的结果返回。您正在切换执行上下文。 在 switchMap 中,像以前一样执行 subItems 的映射,然后使用forkJoin 创建一个包含所有元素承诺的 Observable。一旦所有承诺完成,ForkJoin 将发出一个包含所有结果的数组。就像promise.all。 按照您之前的计划添加项目,然后退回原始项目。

代码

from(HttpGetMainItem()).pipe(
    switchMap(item => 
        forkJoin(_.uniq(item.subItems).map(sid => HttpGetSubItem(sid)))
            .pipe(
                map(results =>  
                    /* add results to matching items.subItems and */
                    return item;
                )
            )
     )
);

我觉得这看起来有点笨拙,因为需要保留根项目及其所需的嵌套。您可以使用 switchMap 的 selector 参数来组合外部和内部 observable。您可以使用该参数代替 map() 中的逻辑,并且由于两个 observables 的结果都已传递,因此您不需要任何进一步的嵌套。

from(HttpGetMainItem()).pipe(
    switchMap(
        item => forkJoin(_.uniq(item.subItems).map(sid => HttpGetSubItem(sid))),
        (item, results) =>  
             /* add results to matching items.subItems and */
             return item;
        
    )
);

【讨论】:

感谢您的代码和解释。我也尝试使用 from(item.subItems).pipe(distinct(), ..)- 你认为值得吗?在这种情况下,代码会是什么样子? 这取决于子项的性质。我不知道 lodash 的 uniq 是如何工作的,但除非你将选择器传递给 distinct ,否则它将是一个简单的相等检查。因此将保留具有相同属性值的不同引用。如果它只是原语或者你提供了一个键选择器,那么是的,distinct 会起作用。 其实我对 _.uniq 也一无所知,只是在示例中使用它来澄清:)。 subItems 是一个简单的字符串数组,我只需要删除重复项。 我想的越多,我认为 distinct 的作用就越小。 Rxjs 的 distinct 是来自结果流的不同值。使用它的唯一方法是用 of 包装每个 subItem,合并它们,然后调用 distinct,除非你想在 HttpGetSubItem 之后调用 distinct,否则效率会很低。【参考方案2】:

我相信你需要这样的东西:

import  from, forkJoin  from 'rxjs';
import  switchMap  from 'rxjs/operators';

itemWithSubItems$ = from(HttpGetMainItem()).pipe(
    switchMap(item => 
        const promises = _.uniq(item.subItems).map(item => HttpGetSubItem(item.sid));
        return forkJoin(promises)
            .map(resultArray => item.subItems = resultArray)
            .map(_ => item);
    )
);

首先获取主要项目。然后使用 forkJoin 解析所有子查询,丰富主项。之后只需返回主要项目。

【讨论】:

【参考方案3】:

也许你可以使用像 async.each 这样的库 -> https://caolan.github.io/async/docs.html#each(可能是每个系列)。

会是这样的:

async.each(array, (obj, cb) => 
  observable with obj in parameter and with subscriber result : 
  cb(err, subscriberRes);
, (err, res) => 
  console.log(res);

【讨论】:

以上是关于Angular/RxJS:带有可观察对象的嵌套服务调用的主要内容,如果未能解决你的问题,请参考以下文章

Angular RxJS Subject主体

Angular2 rxjs 缺少 observable.interval 方法

Angular RxJs:针对异步数据流编程工具

RxSwift:嵌套可观察对象上的 FlatMap

Angular RXJS Observables或Subjects在内部传递数字

如何使用来自html的异步管道获取可观察对象的嵌套值