ForkJoin 2 BehaviorSubjects

Posted

技术标签:

【中文标题】ForkJoin 2 BehaviorSubjects【英文标题】: 【发布时间】:2017-02-04 22:54:12 【问题描述】:

我有两个行为主题流,我试图 forkJoin 没有运气。 正如我想象的那样,它返回了它的最后两个值。这有可能以某种方式实现吗?

不在主语后面调用。

let stream1 = new BehaviorSubject(2);
let stream2 = new BehaviorSubject('two');

Observable.forkJoin(stream1, stream2)
    .subscribe(r => 
         console.log(r);
    );

【问题讨论】:

【参考方案1】:

请注意 forkJoin() 在其文档中的实际作用:

等待 Observable 完成,然后合并它们发出的最后一个值。

这意味着当所有输入 Observable 完成时,forkJoin() 会发出一个值。当使用BehaviorSubject 时,这意味着在它们两个上显式调用complete()

import  Observable, BehaviorSubject, forkJoin  from 'rxjs';

const stream1 = new BehaviorSubject(2);
const stream2 = new BehaviorSubject('two');

forkJoin(stream1, stream2)
  .subscribe(r => 
    console.log(r);
  );

stream1.complete();
stream2.complete();

观看现场演示:https://stackblitz.com/edit/rxjs-9nqtx6

2019 年 3 月:针对 RxJS 6 更新。

【讨论】:

感谢您的回答!你有没有测试过你的代码,因为我的电脑没有调用日志?【参考方案2】:

如果您不想(或不知道何时)拨打complete(),您可以使用combineLatest 而不是forkJoin

使用combineLatest,当任何可观察的源(在你的情况下,你的行为主体)发出一个值时,combineLatest 将触发:

const stream1 = new BehaviorSubject(2);
const stream2 = new BehaviorSubject('two');

combineLatest(stream1, stream2)
    .subscribe(r => 
         console.log(r);
    );

stream1.next(3);
stream2.next('three');

控制台日志:

(2) [2, "two"] // 初始状态

(2) [3, "two"] // 下一个在 stream1 上触发

(2) [3, "three"] // 下一个在 stream2 上触发

现场演示:https://stackblitz.com/edit/rxjs-qzxo3n

【讨论】:

【参考方案3】:

您可以使用上面提到的take(1) 管道或complete() 方法。

private subjectStream1 = new BehaviorSubject(null);
stream1$: Observable = this.subjectStream1.asObservable();

private subjectStream2 = new BehaviorSubject(null);
stream2$: Observable = this.subjectStream2.asObservable();

forkJoin(
  stream1: this.stream1$.pipe(take(1)),
  stream2: this.stream2$.pipe(take(1))
)
.pipe(takeUntil(this._destroyed$))
.subscribe(values) => console.log(values));

【讨论】:

以上是关于ForkJoin 2 BehaviorSubjects的主要内容,如果未能解决你的问题,请参考以下文章

ForkJoin的使用及for循环stream并行流三种方式的时间比较

java多线程ForkJoin分支合并

JUC系列ForkJoin框架设计官方说明翻译

Observable.forkJoin() 不执行

解决超过 6 个 forkJoin 参数?

如何在 Observable.forkJoin(...) 中捕获错误?