如何在 RxJS 中“等待”两个 observables

Posted

技术标签:

【中文标题】如何在 RxJS 中“等待”两个 observables【英文标题】:How to 'wait' for two observables in RxJS 【发布时间】:2017-10-15 16:39:02 【问题描述】:

在我的应用中,我有类似的东西:

this._personService.getName(id)
      .concat(this._documentService.getDocument())
      .subscribe((response) => 
                  console.log(response)
                  this.showForm()
       );

 //Output: 
 // [getnameResult]
 // [getDocumentResult]

 // I want:
 // [getnameResult][getDocumentResult]

然后我得到两个分开的结果,第一个是_personService,然后是_documentService。如何在调用 this.showForm() 之前等待两个结果来完成然后操作每个结果。

【问题讨论】:

forkJoin github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/… 据我所知,你已经是,凭借concat @user3743222 在这种情况下,这些值是单独发出的。一个接一个。 in forkJoin' subscribe 会得到一个结果 - 带有第一个和第二个响应的元组 - 这正是你问的? forkjoin 并不总是有效,因为它需要两个可观察对象都“完成”。有时你想同时解雇“下一个”但不一定“完成” 【参考方案1】:

最后更新:2021 年 6 月。

RxJS v7: combineLatestWith

来自 reactiveX documentation:

每当任何输入 Observable 发出一个值时,它都会使用所有输入的最新值计算一个公式,然后发出该公式的输出。

// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
    
name$.pipe(
        combineLatestWith((name, document) => name, document)
      )
      .subscribe(pair => 
           this.name = pair.name;
           this.document = pair.document;
           this.showForm();
       )

(已弃用)RxJS v6 combineLatest()

来自 reactiveX documentation:

每当任何输入 Observable 发出一个值时,它都会使用所有输入的最新值计算一个公式,然后发出该公式的输出。

(更新:2021 年 2 月)

// Deprecated (RxJS v6)
// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
    
name$.combineLatest(document$, (name, document) => name, document)
    .subscribe(pair => 
           this.name = pair.name;
           this.document = pair.document;
           this.showForm();
       )

(替代语法):combineLatest(observables)

// Deprecated (RxJS v6)
// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
    
combineLatest(name$, document$, (name, document) => (name, document))
    .subscribe(pair => 
           this.name = pair.name;
           this.document = pair.document;
           this.showForm();
       )

zip 与 combineLatest

(更新:2018 年 10 月) 我之前建议使用zip 方法。但是,对于某些用例,combineLatestzip 有一些优势。因此,了解这些差异很重要。

CombineLatest 从 observables 发出最新的发射值。而zip 方法按sequence 顺序发出发射的项目。

例如,如果 observable #1 发出了它的 3rd 项,而 observable #2 发出了它的 5th 项。使用zip 方法的结果将是observables第3 个 发射值。

在这种情况下,使用 combineLatest 的结果将是 5th3rd。感觉更自然。


Observable.zip(observables)

(原答案:2017 年 7 月) Observable.zip 方法在 reactiveX documentation 中解释:

组合多个 Observable 以创建一个 Observable,其值是按顺序根据其每个输入 Observable 的值计算得出的。

// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
    
Observable
    .zip(name$, document$, (name: string, document: string) => (name, document))
    .subscribe(pair => 
           this.name = pair.name;
           this.document = pair.document;
           this.showForm();
       )

附注(适用于两种方法)

最后一个参数,我们提供了一个函数,(name: string, document: string) => (name, document) 是可选的。你可以跳过它,或者做更复杂的操作:

如果最新参数是一个函数,则该函数用于根据输入值计算创建值。否则,返回一个输入值数组。

所以如果你跳过最后一部分,你会得到一个数组:

// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
    
Observable
    .zip(name$, document$)
    .subscribe(pair => 
           this.name = pair['0'];
           this.document = pair['1'];
           this.showForm();
       )

【讨论】:

是否可以用这个等待一个可观察对象的完成?,我的可观察对象有另一个内部可观察对象,而后者又依赖于 http.get ? 在 2020 年 1 月的更新中,为什么要将数组映射到对象?当您可以在 subscribe 方法中破坏数组时,这似乎是一个不必要的步骤。这将是我唯一的评论,其余的看起来不错。 OOOOHHHH,我正在寻找 combineLatest 运算符一个小时......非常感谢 如何退订组合的 observables ?只需取消订阅从 combineLatest().subscribe() 返回的 observable 即可? combineLatest 已被 combineLatestWith 弃用,请参阅此处:rxjs.dev/api/operators/combineLatest【参考方案2】:

使用 forkJoin() 方法的 observables。 Check this link for reference

来自 RXJS docs

当您有一组可观察对象并且只关心每个对象的最终发出值时,最好使用此运算符。一个常见的用例是,如果您希望在页面加载(或其他事件)时发出多个请求,并且只想在收到所有人的响应时采取行动。这种方式类似于你可能使用Promise.all

forkJoin([character, characterHomeworld]).subscribe(results => 
  // results[0] is our character
  // results[1] is our character homeworld
  results[0].homeworld = results[1];
  this.loadedCharacter = results[0];
);

代码取自:https://coryrylan.com/blog/angular-multiple-http-requests-with-rxjs

【讨论】:

是否可以等待一个可观察对象的完成?,我的可观察对象有另一个内部可观察对象,而后者又依赖于 http.get ? @HDJEMAI 如果你想做某事。完成一个 observable 后,可以使用嵌套订阅吗? @YuweiHE 非常糟糕的建议,因为这里有 switchmap/flatmap 操作符。避免嵌套订阅【参考方案3】:

RxJS Operators for Dummies: forkJoin, zip, combineLatest, withLatestFrom 帮助了我很多。顾名思义,它描述了以下组合运算符:

ForkJoin zip combineLatest withLatestFrom

其中任何一个都可能是您正在寻找的东西,具体取决于具体情况。查看文章了解更多信息。

【讨论】:

感谢您提供的第一个链接,即“RxJs Operators ...”是必读的,也是您能得到的最好和最简单的解释【参考方案4】:

改进Hamid Asghari answer 使用直接参数分解并自动添加类型(当您使用打字稿时)

const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();

combineLatest([name$, document$]).subscribe(([name, document]) => 
    this.name = name;
    this.document = document;
    this.showForm();
);

奖励:您还可以使用上述方法处理错误,如下所示

import  combineLatest, of  from 'rxjs';
//...

const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();

combineLatest([
  name$.pipe(     catchError( () => of(null as string  ) ) ), 
  document$.pipe( catchError( () => of(null as Document) ) ), // 'Document' is arbitrary type
]).subscribe(([name, document]) => 
    this.name = name;          // or null if error
    this.document = document;  // or null if error
    this.showForm();
);

【讨论】:

【参考方案5】:

看看 'combineLatest' 方法,这里可能比较合适。 http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-combineLatest

const  Observable  = Rx

const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();

Observable
    .combineLatest(name$, document$, (name, document) => ( name, document ))
    .first() // or not, implementation detail
    .subscribe(( name, document ) => 
        // here we have both name and document
        this.showForm()
    )

【讨论】:

【参考方案6】:

对我来说,sample 是最好的解决方案。

const source = Observable.interval(500);
const example = source.sample(Observable.interval(2000));
const subscribe = example.subscribe(val => console.log('sample', val));

所以.. 仅当第二个(示例)发射时 - 您会看到第一个(源)的最后发射值。

在我的任务中,我等待表单验证和其他 DOM 事件。

【讨论】:

【参考方案7】:

2021 年 6 月

使用 rxjs 6.6.7

像这样使用 combineLatest 否则不推荐使用

combineLatest([a$ , b$]).pipe(
      map(([a, b]) => (a, b)) //change to [a , b] if you want an array
    )

另见@nyxz 帖子

zip - 爱情小鸟,总是作为一个团队工作,只有在所有的时候才会触发 可观察对象返回新值

combineLatest - go dutch,一旦所有可观察对象开始触发 返回新值,然后等待无人,每次触发 任一 observable 返回新值。

withLatestFrom - 主从,主先等待从,后 也就是说,每次只有当主人返回新的时候才会触发动作 价值。

forkJoin - 最终目的地,当所有可观察对象触发一次 已完成。

发件人:https://scotch.io/tutorials/rxjs-operators-for-dummies-forkjoin-zip-combinelatest-withlatestfrom/amp

【讨论】:

【参考方案8】:

你可以像下面这样使用'zip'或'buffer'。

function getName() 
    return Observable.of('some name').delay(100);


function getDocument() 
    return Observable.of('some document').delay(200);


// CASE1 : concurrent requests
Observable.zip(getName(), getDocument(), (name, document) => 
    return `$name-$document`;
)
    .subscribe(value => console.log(`concurrent: $value`));

// CASE2 : sequential requests
getName().concat(getDocument())
    .bufferCount(2)
    .map(values => `$values[0]-$values[1]`)
    .subscribe(value => console.log(`sequential: $value`));

【讨论】:

以上是关于如何在 RxJS 中“等待”两个 observables的主要内容,如果未能解决你的问题,请参考以下文章

如何在 RxJS 订阅方法中等待

如何等待操作直到使用RXJS完成工作?

如何在 Angular/RxJS 中合并两个 observable?

如何在 Vuex 模块中使用 RxJS

在执行下一个代码块之前等待多个 RxJS 调用

在 foreach 中等待几个可观察的 rxjs 直到完成执行另一个