解决超过 6 个 forkJoin 参数?

Posted

技术标签:

【中文标题】解决超过 6 个 forkJoin 参数?【英文标题】:Work around for more than 6 forkJoin parameters? 【发布时间】:2018-09-07 15:02:15 【问题描述】:

我需要/希望在 forkJoin 中使用六个以上的参数。目前,根据对另一个 related question 的回答,似乎不可能向 forkJoin 发送超过 6 个参数。

然而,根据官方文档,它说“forkJoin 是一个运算符,它接受任意数量的 Observable,可以作为数组或直接作为参数传递。”

forkJoin - 官方文档

好吧,我正在这样做,但收到错误 TS2322:类型“foo”不可分配给类型“bar[]”。

在我的研究中,我还发现,如果您有返回不同类型的承诺,最好不要将参数作为数组发送,因为这会将它们类型转换为所有相同的类型。 - Source

这是我的代码。我正在使用最新版本的 Typescript 和 Angular 4。

ngOnInit() 
    this.spinner.show();
    Observable.forkJoin(
        this.loadParams(),              // Returns an Observable<Object>
        this.service.getPromiseType1(), // The rest return Observable<Array>
        this.service.getPromiseType2(),
        this.service.getPromiseType3(),
        this.service.getPromiseType4(),
        this.service.getPromiseType5(),
        this.service.getPromiseType6(),
    ).finally(() => this.spinner.hide())
        .subscribe(
            ([param, promise1, promise2, promise3, promise4, promise5, promise6]) => 
                this.job = job;
                this.value1 = promise1;
                this.value2 = promise2;
                this.value3 = promise3;
                this.value4 = promise4;
                this.value5 = promise5;
                this.value6 = promise6;
            , (error) => errorHandlingFunction
   );

如果我删除任何单个参数,以便将六个参数传递给 forkJoin,它可以正常工作。所以我的问题是,在我想在一次调用中加载对象可观察对象和后续数组可观察对象的情况下,是否有另一种方法可以做到这一点?这是 forkJoin 的一个错误,因为官方文档说它应该能够接受任意数量的 Observables?

我尝试创建一个 Observable 类型的数组并在 forkJoin 中使用 array.forEach(),但它抱怨返回类型为 void。无论如何,这似乎是一种笨拙的方式。

【问题讨论】:

为什么需要同时发出这么多请求? 每个请求都访问不同的 API 端点,返回用于此特定组件的表单所需的不同数据。当页面加载时,表单中的下拉菜单需要填充该数据,因此需要同时完成。 【参考方案1】:

从 rxJs 6.5 版开始,您可以使用值字典将任意数量的 observables 放入 forkJoin see the first example here

    forkJoin(first: of(1), second: of(2), third: of(3))
             .subscribe(result => console.log(result.first));

【讨论】:

【参考方案2】:

分叉连接可以嵌套到六个或更少参数的逻辑组中。以下应该可以工作(但尚未测试):

ngOnInit() 
    this.spinner.show();
    Observable.forkJoin(
        this.loadParams(),
        Observable.forkJoin(
            this.service.getPromiseType1(),
            this.service.getPromiseType2(),
            this.service.getPromiseType3(),
        ),
        Observable.forkJoin(
            this.service.getPromiseType4(),
            this.service.getPromiseType5(),
            this.service.getPromiseType6(),
        )
    )
    .finally(() => this.spinner.hide())
    .subscribe(payloads => 
        [
            this.job,
            [
                this.value1,
                this.value2,
                this.value3,
            ],
            [
                this.value4,
                this.value5,
                this.value6,
            ],
        ] = payloads
    , (error) => 
        errorHandlingFunction
    );

【讨论】:

【参考方案3】:

感谢@MikeHill 让我指出了正确的方向。最终为我工作的解决方案是将 typings.d.ts 添加到 angular-cli 生成的 src 文件夹中。我相信您也可以在同一位置使用 typings 文件夹,但您需要更新您的 tsconfig.app.json 文件,请参阅本文了解更多信息。 https://github.com/angular/angular-cli/blob/6449a753641340d8fc19a752e1a1ced75f974efa/docs/documentation/1-x/stories/third-party-lib.md

typings.d.ts 用于 forkJoin 的 7 个参数

import  ObservableInput, Observable  from 'rxjs';
import  forkJoin  from 'rxjs/internal/observable/forkJoin';

declare module 'rxjs/internal/observable/forkJoin' 
  export function forkJoin<T, T2, T3, T4, T5, T6, T7>(
    sources: [
      ObservableInput<T>,
      ObservableInput<T2>,
      ObservableInput<T3>,
      ObservableInput<T4>,
      ObservableInput<T5>,
      ObservableInput<T6>,
      ObservableInput<T7>
    ],
  ): Observable<[T, T2, T3, T4, T5, T6, T7]>;
  export function forkJoin<T, T2, T3, T4, T5, T6, T7>(
    v1: ObservableInput<T>,
    v2: ObservableInput<T2>,
    v3: ObservableInput<T3>,
    v4: ObservableInput<T4>,
    v5: ObservableInput<T5>,
    v6: ObservableInput<T6>,
    v7: ObservableInput<T7>,
  ): Observable<[T, T2, T3, T4, T5, T6, T7]>;

【讨论】:

【参考方案4】:

正如answer 在您链接的问题中解释的那样,参数的最大数量仅受类型定义的限制 - 而不是运行时源本身。类型定义很有用,因为它们声明了将为可观察流的下一步生成的数组元素类型(在您的情况下为 [param, promise1, promise2, ...] 的类型)。

听起来您订阅处理程序中分配的严格类型安全是导致您出现问题的原因。由于您有 6 个以上的 observable,它会将结果参数默认为共享类型,该类型可能与您尝试分配的字段类型不匹配。

有几种方法可以解决这个问题。您可以在订阅处理程序中转换参数,也可以自己添加自己的类型。强制转换参数是一种快速而肮脏的解决方案,但它会导致您失去类型安全性。自己添加类型可以让您保持类型安全,但这也可能意味着您最终会得到任意数量的工厂方法声明。将以下内容放在项目中任何位置的类型定义文件 (*.d.ts) 中。我喜欢将这样的类型定义放在 typings/ 目录中,与我的 app/ 目录同级。

import  Observable, SubscribableOrPromise  from 'rxjs/Observable';

declare module 'rxjs/observable/ForkJoinObservable' 
    namespace ForkJoinObservable 
        export function create<T, T2, T3, T4, T5, T6, T7>(v1: SubscribableOrPromise<T>, v2: SubscribableOrPromise<T2>, v3: SubscribableOrPromise<T3>, v4: SubscribableOrPromise<T4>, v5: SubscribableOrPromise<T5>, v6: SubscribableOrPromise<T6>, v7: SubscribableOrPromise<T7>): Observable<[T, T2, T3, T4, T5, T6, T7]>;
        export function create<T, T2, T3, T4, T5, T6, T7, R>(v1: SubscribableOrPromise<T>, v2: SubscribableOrPromise<T2>, v3: SubscribableOrPromise<T3>, v4: SubscribableOrPromise<T4>, v5: SubscribableOrPromise<T5>, v6: SubscribableOrPromise<T6>, v7: SubscribableOrPromise<T7>, project: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6, v7: T7) => R): Observable<R>;
    

Declaration Merging 的 TypeScript 文档页面中更详细地解释了此过程。


编辑:看起来我使用的是旧版本的 RxJS,并且结构从那以后发生了一些变化。以下应该更接近于适用于当前结构的类型声明:

declare module 'rxjs/Observable' 
    namespace Observable 
        export function forkJoin<T, T2, T3, T4, T5, T6, T7>(sources: [ObservableInput<T>, ObservableInput<T2>, ObservableInput<T3>, ObservableInput<T4>, ObservableInput<T5>, ObservableInput<T6>, ObservableInput<T7>]): Observable<[T, T2, T3, T4, T5, T6, T7]>;
        export function forkJoin<T, T2, T3, T4, T5, T6, T7>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, v7: ObservableInput<T7>): Observable<[T, T2, T3, T4, T5, T6, T7]>;
    

我将这些基于当前的 forkJoin 类型声明。

就模块扩充而言,上面的代码修改了由绝对路径'rxjs/Observable' 定义的模块的类型声明。这与您在导入 Observable 类时使用的导入路径相同。 RxJS 定义的模块导出Observable 类及其字段。我们对该模块的扩充通过使用namespace 块对其进行了修改。这允许我们在 Observable 命名空间下添加类型声明(例如,Observable.myFunctionOrField 的类型声明),这看起来与在其上调用静态函数相同。实际上,这声明了额外的Observable.forkJoin 函数可能性。

【讨论】:

感谢您的帮助。我通读了声明合并的文档,无法将您的代码与它们为模块扩充显示的示例完全放在一起。您提供的代码究竟是如何与 forkJoinObservable 模块挂钩的?当我在新的 .d.ts 文件中使用您的代码时,两个创建函数出现重复的函数错误,所以我想知道这如何与项目挂钩以解决这些冲突。我还以为我知道如何在订阅处理程序中转换参数,但是在我自己尝试并进行了一些搜索之后,我显然不知道。 @MikeHill 是否可以提供一个在订阅处理程序中转换参数的示例?我试图做这样的事情。 .subscribe(([param&lt;Param&gt;, promise1&lt;Promise1&gt; ]) =&gt; 您将有三个选项进行转换:原始 observable、订阅参数和赋值操作本身。可观察的将是这样的:(Observable.forkJoin(...) as Observable&lt;[T, T2, T3, ...]&gt;).subscribe(...)。订阅参数为:.subscribe(([param, promise1, ...]: [T, T2, ...]) =&gt; ...)。分配将是:this.job = param1 as T; this.value1 = promise1 as T2; ...。尝试每一个,看看哪种最适合你的风格偏好。您的编译严格性设置也可能会限制您的这些选项。 看起来我使用的是旧版本的 RxJS。我为最新版本添加了类型声明,并试图解释这种增强是如何工作的。请让我知道这是否可以为您解决问题。 太棒了!再次感谢您提供详尽的解释、资源和示例。这就是我喜欢编程的地方。能够分享大量信息以帮助他人学习新事物的人。【参考方案5】:

你确定 6 的限制吗?

这个在 forkJoins 中使用了 10 个 Observable 的例子似乎可以工作

const oArray = 
[ ... new Array(10).keys() ]
.map(n => Observable.of(n))
.reduce((obsArray, obs) => 
    obsArray.push(obs);
    return obsArray
, new Array<Observable<number>>())

Observable
.forkJoin(oArray)
.subscribe(console.log, console.error, () => console.log('DONE'))

另外,我不明白您使用的 finally 运算符。我在 RxJs 5.5.2 中没有它

【讨论】:

这似乎只是一个问题,因为参数的最大数量仅受类型定义的限制,正如@MikeHill 在另一个响应中指出的那样。正如他解释的那样,我将尝试为此创建自己的类型定义,以便我可以接受更多类型。至于 finally 运算符,它是一个回调函数,它返回一个 Observable,当源在完成或错误时优雅地终止时调用一个函数。 RxJs finally Documentation

以上是关于解决超过 6 个 forkJoin 参数?的主要内容,如果未能解决你的问题,请参考以下文章

ForkJoin框架详解

ForkJoin 任务分解

如何在RXJS 6.3.3中使用ForkJoin导入Observer?

ForkJoin框架

Java多线程Java的MapReduce框架ForkJoin

使用ForkJoin