Observable.forkJoin() 不执行

Posted

技术标签:

【中文标题】Observable.forkJoin() 不执行【英文标题】:Observable.forkJoin() doesn't execute 【发布时间】:2017-03-11 05:22:49 【问题描述】:

我有以下代码:

//Loop: For each user ID/Role ID, get the data
userMeta.forEach((businessRole) => 
  Observable.forkJoin(
    af.database.object('/roles/'+businessRole.$value),
    af.database.object('/users/'+businessRole.$key)
  ).subscribe(
    data => 
      console.log("Data received");
      data[1].role = data[0];
      this.users.push(data[1]);
    ,
    err => console.error(err)
  );

我正在尝试使用forkJoin 订阅 2 个 observables 的结果。

由于某些原因,未显示“已收到数据”消息。

我的 userMeta 变量在 console.log 中看起来不错:

怎么了?

更新:以下代码也不返回任何内容

let source = Observable.forkJoin(
        af.database.object('/roles/'+businessRole.$value),
        af.database.object('/users/'+businessRole.$key)
    );
    let subscription = source.subscribe(
      function (x) 
    console.log("GOT: " + x);
  ,
  function (err) 
    console.log('Error: %s', err);
  ,
  function () 
    console.log('Completed');
  );

我真正想做的是提高以下代码的性能:

//Subscription 3: role ID to role Name
        af.database.object('/roles/'+businessRole.$value)
        .subscribe((roleData) => 
        //Subscription 4: Get user info
        af.database.object('/users/'+businessRole.$key).subscribe(user => 

【问题讨论】:

forkJoin() 在两个 Observables 完成后发出一个值,所以你确定他们会这样做吗?也许其中一个以错误结束...... 也没有错误(请参阅问题更新)。它们也必须工作,因为以前的代码是订阅中的订阅,并且它有效 forkJoin() 不会从源 Observables 传递错误,因此即使抛出错误也不会打印任何内容。如果你想确保它不会发出错误,你需要订阅每个源 Observables。 【参考方案1】:

forkJoin() 要求所有源 Observable 至少发射一次并完成。

以下演示按预期完成:

const source = forkJoin(
  from([1,2,3]),
  from([9,8,7,6])
).subscribe(
  x => console.log('GOT:', x),
  err => console.log('Error:', err),
  () => console.log('Completed')
);

现场演示:https://stackblitz.com/edit/rxjs-urhkni

GOT: 3,6
Completed

2019 年 1 月:针对 RxJS 6 更新

【讨论】:

你在问题​​的最后检查了我更新的代码吗?因为它适用于相同的 observables,只是不适用于 forkJoin()。真是奇怪…… @TheUnreal 但这与您想要对 forkJoin() 执行的操作不同。 .subscribe((roleData) => ... 在每次 next 调用时发出一个值。 forkJoin()forkJoin() 需要两个 Observables 才能完成。 否则它不会发出任何东西。 好吧,我好像错过了forkJoin()。因为你的回答是正确的,所以我会批准它 - 并就我的情况提出一个新问题 Observable.combineLatest() 可能就是您正在寻找的@TheUnreal。 @JakubBarczyk 谢谢。如果我可以两次投票给你,我会的。【参考方案2】:

我在使用 Angular 2 / Angularfire 2 时遇到了类似的问题,特别是在我通过电子邮件查找用户是否存在的地方。在一种情况下,用户存在并且我从 Observable 接收到一个对象的数组。在另一种情况下,用户不存在,我收到一个空数组。

当我将 forkJoin 与 resultSelector 和 subscribe 一起使用时,resultSelector 和 subscribe 函数都没有运行过。但是,当我尝试

Observable.zip(
  FirebaseListObservable,
  FirebaseListObservable,
  (...results) => 
    return results.map(some code here)
  
).subscribe(res => console.log(res));

选择器和订阅都有效。我认为这与@martin 的回答有关,其中 forkJoin 需要完成 observables,因为根据定义它返回 最后一个 排放。如果一个 observable 永远不会完成,我想它永远不会有 last 发射。

也许 angularfire 列表可观察对象(或您的情况下的对象可观察对象)永远不会完成,从而无法使用 forkJoin。幸运的是 zip 具有类似的行为并且仍然有效,不同之处在于如果 Firebase 中的数据发生变化,它可以重复多次,而 forkJoin 只结合最后一个响应。

就我而言,我正在考虑 1) 使用 zip 并接受如果在 .zip 仍在运行时用户数据发生更改,我的代码可能会运行多次,2) 在第一组数据之后手动禁用 zip返回,或者 3) 放弃 Angularfire 并直接试用 Firebase api,使用类似 .once 的东西来查看我是否可以获得一个完成并触发 forkJoin 的 observable。

【讨论】:

我遇到了完全相同的问题,可以通过使用Observable.combineLatest 而不是Observable.forkJoin 非常简单地解决这个问题。学分转到github.com/angular/angularfire2/issues/617 感谢分享,其实很有用。 谢谢,这非常有帮助。我正在对 Firestore 进行查询,该查询可能会或可能不会返回一个值,并且当它们没有值时,forkJoin 不会返回任何内容。我只是用 combineLatest 替换了 forkJoin,它工作得很好。感谢您节省了另一个小时的研究时间。【参考方案3】:

我遇到过类似的问题:我正在动态创建一个可观察对象列表,我注意到如果可观察对象列表为空,forkjoin() 永远不会发出或完成,而 Promise.all() 会以空列表解析:

Observable.forkJoin([])
    .subscribe(() => console.log('do something here')); // This is never called

我找到的解决方法是检查列表的长度,当它为空时不使用此运算符。

return jobList.length ? Observable.forkJoin(jobList) : Observable.of([]);

【讨论】:

我有用于动态触发的子组件的事件发射器。我将它绑定到一个输出参数。我已将其附加到父级,需要等到所有可观察项完成。【参考方案4】:

只需添加observer.complete();

不起作用:

observer.next(...)

有效:

observer.next(...);
observer.complete();

希望对你有帮助。

【讨论】:

【参考方案5】:

我遇到了同样的问题,我无法真正得到 forkJoin 运算符的工作,所以我只使用了 combineLatest,它已经工作了!

【讨论】:

以上是关于Observable.forkJoin() 不执行的主要内容,如果未能解决你的问题,请参考以下文章

Rxjs:Observable.combineLatest vs Observable.forkJoin

如何在 Observable.forkJoin(...) 中捕获错误?

函数返回后 Observable.forkJoin 调用延迟

如何在RXJS 6.3.3中使用ForkJoin导入Observer?

ForkJoin 2 BehaviorSubjects

显示错误并恢复流