可观察到的 forkJoin 没有触发
Posted
技术标签:
【中文标题】可观察到的 forkJoin 没有触发【英文标题】:Observable forkJoin not firing 【发布时间】:2017-08-06 04:27:44 【问题描述】:我正在尝试在两个 Observable 上使用 forkJoin
。其中一个以流的形式开始...如果我直接订阅它们,我会得到响应,forkJoin
不会触发。有什么想法吗?
private data$: Observable<any[]>;
private statuses$: Observable<any[]>;
private queryStream = new Subject<string>();
....
this.data$ = this.queryStream
.startWith('')
.flatMap(queryInput =>
this.query = queryInput
return this._companyService.getCompanies(this.queryRequired + ' ' + this.query, this.page, this.sort);
)
.share();
...
Observable.forkJoin(this.statuses$, this.companies$)
.subscribe(res =>
console.log('forkjoin');
this._countStatus(res[0], res[1]);
);
// This shows arrays in the console...
this.statuses$.subscribe(res => console.log(res));
this.companies$.subscribe(res => console.log(res));
// In the console
Array[9]
Array[6]
【问题讨论】:
【参考方案1】:forkJoin
的一个非常常见的问题是它要求所有源 Observable 至少发出一个项目,并且所有项目都必须完成。
换句话说,如果this.statuses$
或this.companies$
没有发出任何项目,并且在他们都完成之前forkJoin
不会发出任何东西。
this.statuses$.subscribe(
res => console.log(res),
undefined,
() => console.log('completed'),
);
【讨论】:
它们都发出项目,如果我订阅和控制台日志,我可以看到它们发出的数组。我已经更新了问题以反映这一点。谢谢 @nick 他们两个都完成了吗? 我不确定,最好的方法是什么?如果没有,我该如何使它们完整?问题似乎出在queryStream: Subject
上,我只是创建了一个直的Observable
它可以工作...
Subject
在您告诉它这样做之前不会完成。所以你需要手动拨打queryStream.complete()
。但这取决于您的应用程序逻辑,也许您可以只使用take(1)
而无需完成Subject
。查看更新如何检查 Observable 完成【参考方案2】:
Observable.forkJoin([
_someService.getUsers(),
_someService.getCustomers(),
])
.subscribe((data: [Array<User>, Array<Customer>]) =>
let users: Array<User> = data[0];
let customer: Array<Customer> = data[1];
, err =>
);
//someService
getUsers():Observable<User>
let url = '/users';
return this._http.get(url, headers)
.map(res => res.json());
getCustomers():Observable<Customer>
let url = '/customers';
return this._http.get(url, headers)
.map(res => res.json());
【讨论】:
【参考方案3】:forkJoin
仅在所有内部可观察对象都完成时才发出。
如果您需要一个等效的 forkJoin
来监听每个源的单个发射,请使用 combineLatest
+ take(1)
combineLatest(
this.statuses$,
this.companies$,
)
.pipe(
take(1),
)
.subscribe(([statuses, companies]) =>
console.log('forkjoin');
this._countStatus(statuses, companies);
);
一旦两个源都发出,combineLatest
将发出,take(1)
将在此之后立即取消订阅。
【讨论】:
combineLatest 的问题在于它会返回最先触发的 observable 的数据。我不得不更改为 forkjoin,因为我需要等待两个可观察对象相互等待。 谢谢。他妈的。你!我不知道大卫在说什么;它对我来说就像一个魅力!【参考方案4】:forkJoin
不起作用,所以我使用下面的代码来解决我的问题。使用mergeMap
,您可以将外部订阅的结果映射到内部订阅,并根据需要进行订阅
this.statuses$.pipe(
mergeMap(source => this.companies$.pipe(
map(inner => [source , inner])
)
)
).subscribe(([e , r]) =>
console.log(e , r);
)
【讨论】:
【参考方案5】:对我来说,combineLatest
运算符是解决方案!
【讨论】:
这里也一样...我不知道为什么 forkJoin 不起作用...但是 combineLatest 确实【参考方案6】:将.pipe(take(1))
附加为asObservable()
的管道就可以完成这项工作。
forkJoin(
l0: this._svc.data$.pipe(take(1)),
l1: this._api.getLogman1(),
l2: this._api.getLogman2(),
l3: this._api.getLogman3(),
)
.pipe(
takeUntil(this._unsubscribeAll),
)
.subscribe(x =>
console.log(x);
);
【讨论】:
哇,这解决了我的问题。非常感谢!以上是关于可观察到的 forkJoin 没有触发的主要内容,如果未能解决你的问题,请参考以下文章