如何在 RxJS 中“等待”两个 observables
Posted
技术标签:
【中文标题】如何在 RxJS 中“等待”两个 observables【英文标题】:How to 'wait' for two observables in RxJS 【发布时间】:2017-10-15 16:39:02 【问题描述】:在我的应用中,我有类似的东西:
this._personService.getName(id)
.concat(this._documentService.getDocument())
.subscribe((response) =>
console.log(response)
this.showForm()
);
//Output:
// [getnameResult]
// [getDocumentResult]
// I want:
// [getnameResult][getDocumentResult]
然后我得到两个分开的结果,第一个是_personService
,然后是_documentService
。如何在调用 this.showForm()
之前等待两个结果来完成然后操作每个结果。
【问题讨论】:
forkJoin github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/… 据我所知,你已经是,凭借concat
@user3743222 在这种情况下,这些值是单独发出的。一个接一个。
in forkJoin' subscribe 会得到一个结果 - 带有第一个和第二个响应的元组 - 这正是你问的?
forkjoin 并不总是有效,因为它需要两个可观察对象都“完成”。有时你想同时解雇“下一个”但不一定“完成”
【参考方案1】:
最后更新:2021 年 6 月。
RxJS v7: combineLatestWith
来自 reactiveX documentation:
每当任何输入 Observable 发出一个值时,它都会使用所有输入的最新值计算一个公式,然后发出该公式的输出。
// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
name$.pipe(
combineLatestWith((name, document) => name, document)
)
.subscribe(pair =>
this.name = pair.name;
this.document = pair.document;
this.showForm();
)
(已弃用)RxJS v6 combineLatest()
来自 reactiveX documentation:
每当任何输入 Observable 发出一个值时,它都会使用所有输入的最新值计算一个公式,然后发出该公式的输出。
(更新:2021 年 2 月):
// Deprecated (RxJS v6)
// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
name$.combineLatest(document$, (name, document) => name, document)
.subscribe(pair =>
this.name = pair.name;
this.document = pair.document;
this.showForm();
)
(替代语法):combineLatest(observables)
// Deprecated (RxJS v6)
// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
combineLatest(name$, document$, (name, document) => (name, document))
.subscribe(pair =>
this.name = pair.name;
this.document = pair.document;
this.showForm();
)
zip 与 combineLatest
(更新:2018 年 10 月)
我之前建议使用zip
方法。但是,对于某些用例,combineLatest
比 zip
有一些优势。因此,了解这些差异很重要。
CombineLatest
从 observables 发出最新的发射值。而zip
方法按sequence 顺序发出发射的项目。
例如,如果 observable #1 发出了它的 3rd 项,而 observable #2 发出了它的 5th 项。使用zip
方法的结果将是observables
的第3 个 发射值。
在这种情况下,使用 combineLatest
的结果将是 5th 和 3rd。感觉更自然。
Observable.zip(observables)
(原答案:2017 年 7 月) Observable.zip 方法在 reactiveX documentation 中解释:
组合多个 Observable 以创建一个 Observable,其值是按顺序根据其每个输入 Observable 的值计算得出的。
// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
Observable
.zip(name$, document$, (name: string, document: string) => (name, document))
.subscribe(pair =>
this.name = pair.name;
this.document = pair.document;
this.showForm();
)
附注(适用于两种方法)
最后一个参数,我们提供了一个函数,(name: string, document: string) => (name, document)
是可选的。你可以跳过它,或者做更复杂的操作:
如果最新参数是一个函数,则该函数用于根据输入值计算创建值。否则,返回一个输入值数组。
所以如果你跳过最后一部分,你会得到一个数组:
// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
Observable
.zip(name$, document$)
.subscribe(pair =>
this.name = pair['0'];
this.document = pair['1'];
this.showForm();
)
【讨论】:
是否可以用这个等待一个可观察对象的完成?,我的可观察对象有另一个内部可观察对象,而后者又依赖于 http.get ? 在 2020 年 1 月的更新中,为什么要将数组映射到对象?当您可以在 subscribe 方法中破坏数组时,这似乎是一个不必要的步骤。这将是我唯一的评论,其余的看起来不错。 OOOOHHHH,我正在寻找 combineLatest 运算符一个小时......非常感谢 如何退订组合的 observables ?只需取消订阅从 combineLatest().subscribe() 返回的 observable 即可?combineLatest
已被 combineLatestWith
弃用,请参阅此处:rxjs.dev/api/operators/combineLatest【参考方案2】:
使用 forkJoin()
方法的 observables。 Check this link for reference
来自 RXJS docs
当您有一组可观察对象并且只关心每个对象的最终发出值时,最好使用此运算符。一个常见的用例是,如果您希望在页面加载(或其他事件)时发出多个请求,并且只想在收到所有人的响应时采取行动。这种方式类似于你可能使用Promise.all
forkJoin([character, characterHomeworld]).subscribe(results =>
// results[0] is our character
// results[1] is our character homeworld
results[0].homeworld = results[1];
this.loadedCharacter = results[0];
);
代码取自:https://coryrylan.com/blog/angular-multiple-http-requests-with-rxjs
【讨论】:
是否可以等待一个可观察对象的完成?,我的可观察对象有另一个内部可观察对象,而后者又依赖于 http.get ? @HDJEMAI 如果你想做某事。完成一个 observable 后,可以使用嵌套订阅吗? @YuweiHE 非常糟糕的建议,因为这里有 switchmap/flatmap 操作符。避免嵌套订阅【参考方案3】:RxJS Operators for Dummies: forkJoin, zip, combineLatest, withLatestFrom 帮助了我很多。顾名思义,它描述了以下组合运算符:
ForkJoin zip combineLatest withLatestFrom其中任何一个都可能是您正在寻找的东西,具体取决于具体情况。查看文章了解更多信息。
【讨论】:
感谢您提供的第一个链接,即“RxJs Operators ...”是必读的,也是您能得到的最好和最简单的解释【参考方案4】:改进Hamid Asghari answer 使用直接参数分解并自动添加类型(当您使用打字稿时)
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
combineLatest([name$, document$]).subscribe(([name, document]) =>
this.name = name;
this.document = document;
this.showForm();
);
奖励:您还可以使用上述方法处理错误,如下所示
import combineLatest, of from 'rxjs';
//...
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
combineLatest([
name$.pipe( catchError( () => of(null as string ) ) ),
document$.pipe( catchError( () => of(null as Document) ) ), // 'Document' is arbitrary type
]).subscribe(([name, document]) =>
this.name = name; // or null if error
this.document = document; // or null if error
this.showForm();
);
【讨论】:
【参考方案5】:看看 'combineLatest' 方法,这里可能比较合适。 http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-combineLatest
const Observable = Rx
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
Observable
.combineLatest(name$, document$, (name, document) => ( name, document ))
.first() // or not, implementation detail
.subscribe(( name, document ) =>
// here we have both name and document
this.showForm()
)
【讨论】:
【参考方案6】:对我来说,sample 是最好的解决方案。
const source = Observable.interval(500);
const example = source.sample(Observable.interval(2000));
const subscribe = example.subscribe(val => console.log('sample', val));
所以.. 仅当第二个(示例)发射时 - 您会看到第一个(源)的最后发射值。
在我的任务中,我等待表单验证和其他 DOM 事件。
【讨论】:
【参考方案7】:2021 年 6 月
使用 rxjs 6.6.7
像这样使用 combineLatest 否则不推荐使用
combineLatest([a$ , b$]).pipe(
map(([a, b]) => (a, b)) //change to [a , b] if you want an array
)
另见@nyxz 帖子
zip - 爱情小鸟,总是作为一个团队工作,只有在所有的时候才会触发 可观察对象返回新值
combineLatest - go dutch,一旦所有可观察对象开始触发 返回新值,然后等待无人,每次触发 任一 observable 返回新值。
withLatestFrom - 主从,主先等待从,后 也就是说,每次只有当主人返回新的时候才会触发动作 价值。
forkJoin - 最终目的地,当所有可观察对象触发一次 已完成。
发件人:https://scotch.io/tutorials/rxjs-operators-for-dummies-forkjoin-zip-combinelatest-withlatestfrom/amp
【讨论】:
【参考方案8】:你可以像下面这样使用'zip'或'buffer'。
function getName()
return Observable.of('some name').delay(100);
function getDocument()
return Observable.of('some document').delay(200);
// CASE1 : concurrent requests
Observable.zip(getName(), getDocument(), (name, document) =>
return `$name-$document`;
)
.subscribe(value => console.log(`concurrent: $value`));
// CASE2 : sequential requests
getName().concat(getDocument())
.bufferCount(2)
.map(values => `$values[0]-$values[1]`)
.subscribe(value => console.log(`sequential: $value`));
【讨论】:
以上是关于如何在 RxJS 中“等待”两个 observables的主要内容,如果未能解决你的问题,请参考以下文章