可观察到的 forkJoin 没有触发

Posted

技术标签:

【中文标题】可观察到的 forkJoin 没有触发【英文标题】:Observable forkJoin not firing 【发布时间】:2017-08-06 04:27:44 【问题描述】:

我正在尝试在两个 Observable 上使用 forkJoin。其中一个以流的形式开始...如果我直接订阅它们,我会得到响应,forkJoin 不会触发。有什么想法吗?

private data$: Observable<any[]>;
private statuses$: Observable<any[]>;
private queryStream = new Subject<string>();    

....

this.data$ = this.queryStream
    .startWith('')
     .flatMap(queryInput => 
            this.query = queryInput
            return this._companyService.getCompanies(this.queryRequired + ' ' + this.query, this.page, this.sort);
                )
            .share();
    
...

Observable.forkJoin(this.statuses$, this.companies$)
            .subscribe(res => 
                console.log('forkjoin');
                this._countStatus(res[0], res[1]);
            );


// This shows arrays in the console...

this.statuses$.subscribe(res => console.log(res));
this.companies$.subscribe(res => console.log(res));

// In the console
Array[9]
Array[6]

【问题讨论】:

【参考方案1】:

forkJoin 的一个非常常见的问题是它要求所有源 Observable 至少发出一个项目,并且所有项目都必须完成。

换句话说,如果this.statuses$this.companies$ 没有发出任何项目,并且在他们都完成之前forkJoin 不会发出任何东西。

this.statuses$.subscribe(
    res => console.log(res),
    undefined,
    () => console.log('completed'),
);

【讨论】:

它们都发出项目,如果我订阅和控制台日志,我可以看到它们发出的数组。我已经更新了问题以反映这一点。谢谢 @nick 他们两个都完成了吗? 我不确定,最好的方法是什么?如果没有,我该如何使它们完整?问题似乎出在queryStream: Subject 上,我只是创建了一个直的Observable 它可以工作... Subject 在您告诉它这样做之前不会完成。所以你需要手动拨打queryStream.complete()。但这取决于您的应用程序逻辑,也许您可​​以只使用take(1) 而无需完成Subject。查看更新如何检查 Observable 完成【参考方案2】:
Observable.forkJoin([
      _someService.getUsers(),
      _someService.getCustomers(),
    ])
      .subscribe((data: [Array<User>, Array<Customer>]) => 
        let users: Array<User> = data[0];
        let customer: Array<Customer> = data[1];
      , err => 
      );





      //someService
        getUsers():Observable<User> 
          let url = '/users';
          return this._http.get(url, headers)
            .map(res => res.json());
        

        getCustomers():Observable<Customer> 
          let url = '/customers';
          return this._http.get(url, headers)
            .map(res => res.json());
        

【讨论】:

【参考方案3】:

forkJoin 仅在所有内部可观察对象都完成时才发出。 如果您需要一个等效的 forkJoin 来监听每个源的单个发射,请使用 combineLatest + take(1)

combineLatest(
  this.statuses$,
  this.companies$,
)
.pipe(
  take(1),
)
.subscribe(([statuses, companies]) => 
  console.log('forkjoin');
  this._countStatus(statuses, companies);
);

一旦两个源都发出,combineLatest 将发出,take(1) 将在此之后立即取消订阅。

【讨论】:

combineLatest 的问题在于它会返回最先触发的 observable 的数据。我不得不更改为 forkjoin,因为我需要等待两个可观察对象相互等待。 谢谢。他妈的。你!我不知道大卫在说什么;它对我来说就像一个魅力!【参考方案4】:

forkJoin 不起作用,所以我使用下面的代码来解决我的问题。使用mergeMap,您可以将外部订阅的结果映射到内部订阅,并根据需要进行订阅

this.statuses$.pipe(
    mergeMap(source => this.companies$.pipe(
        map(inner => [source , inner])
        )
    )
).subscribe(([e , r]) => 
    console.log(e , r);
)

【讨论】:

【参考方案5】:

对我来说,combineLatest 运算符是解决方案!

【讨论】:

这里也一样...我不知道为什么 forkJoin 不起作用...但是 combineLatest 确实【参考方案6】:

.pipe(take(1)) 附加为asObservable() 的管道就可以完成这项工作。

forkJoin(
    l0: this._svc.data$.pipe(take(1)),
    l1: this._api.getLogman1(),
    l2: this._api.getLogman2(),
    l3: this._api.getLogman3(),
)
    .pipe(
        takeUntil(this._unsubscribeAll),
    )
    .subscribe(x => 
        console.log(x);
    );

【讨论】:

哇,这解决了我的问题。非常感谢!

以上是关于可观察到的 forkJoin 没有触发的主要内容,如果未能解决你的问题,请参考以下文章

解决超过 6 个 forkJoin 参数?

尽管可观察到的变化,但计算的可观察到的不会触发

如何等待所有可观察对象按顺序执行?

带有引导事件的 Angular2 不会触发可观察到的变化

使用 forkJoin 的条件 observables

使用加速框架,没有可观察到的加速