如何在 Rx Observable 上“等待”?

Posted

技术标签:

【中文标题】如何在 Rx Observable 上“等待”?【英文标题】:How can I `await` on an Rx Observable? 【发布时间】:2016-03-15 09:48:40 【问题描述】:

我希望能够等待一个可观察的,例如

const source = Rx.Observable.create(/* ... */)
//...
await source;

天真的尝试会导致等待立即解决,而不是阻塞执行

编辑: 我的完整预期用例的伪代码是:

if (condition) 
  await observable;

// a bunch of other code

我知道我可以将其他代码移动到另一个单独的函数中并将其传递给订阅回调,但我希望能够避免这种情况。

【问题讨论】:

你不能把剩下的代码(你想等待源代码)移动到.subscribe()方法调用中吗? 【参考方案1】:

你必须向await 传递一个承诺。将 observable 的下一个事件转换为一个 Promise 并等待它。

if (condition) 
  await observable.first().toPromise();

编辑说明:此答案最初使用 .take(1) 但已更改为使用 .first() 这避免了如果流在值通过之前结束,Promise 永远不会解决的问题。

从 RxJS v8 开始,toPromise 将被删除。取而代之,上面的可以换成await firstValueFrom(observable)

【讨论】:

@apricity 如果完成时没有值,first() 将导致拒绝,take(1) 将导致待处理的承诺。 @apricity @AgentME 实际上,在这种情况下,您不应该使用take(1)first()。由于您期望恰好发生 ONE 事件,因此您应该使用 single(),如果超过 1 则将引发异常,而在没有异常时不会引发异常。如果有多个,则您的代码/数据模型等可能有问题。如果您不使用单个,您最终会任意选择返回的第一个项目,而不会警告还有更多。您必须注意上游数据源的谓词以始终保持相同的顺序。 别忘了导入:import 'rxjs/add/operator/first'; 现在 toPromise() 已经被弃用了,我们应该怎么做呢? @Jus10 我找不到任何表明 toPromise() 已被弃用的信息。你在哪里读到的?【参考方案2】:

使用新的 firstValueFrom()lastValueFrom() 代替 toPromise(),正如这里指出的,从 RxJS 7 开始不推荐使用,并将在 RxJS 8 中删除。

import  firstValueFrom from 'rxjs';
import  lastValueFrom  from 'rxjs';

this.myProp = await firstValueFrom(myObservable$);
this.myProp = await lastValueFrom(myObservable$);

这在 RxJS 7+ 中可用

见:https://indepth.dev/rxjs-heads-up-topromise-is-being-deprecated/

【讨论】:

【参考方案3】:

应该是这样的

await observable.first().toPromise();

正如之前在 cmets 中所指出的,当有空的已完成 observable 时,take(1)first() 运算符之间存在很大差异。

Observable.empty().first().toPromise() 将导致 EmptyError 被拒绝,可以相应地处理,因为确实没有任何价值。

Observable.empty().take(1).toPromise() 将产生undefined 值的分辨率。

【讨论】:

实际上take(1)不会产生待处理的承诺。它将产生一个用undefined 解决的承诺。 感谢您的关注,这是正确的。我不确定为什么帖子会有所不同,可能是行为在某个时候发生了变化。【参考方案4】:

你需要await 一个promise,所以你需要使用toPromise()。有关toPromise() 的更多详细信息,请参阅this。

【讨论】:

【参考方案5】:

编辑:

.toPromise() 现在在 RxJS 7 中已弃用(来源:https://rxjs.dev/deprecations/to-promise)

新答案:

作为已弃用的 toPromise() 方法的替代品,您应该使用 两个内置静态转换函数之一 firstValueFrom 或 最后一个值来自。

例子:

import  interval, lastValueFrom  from 'rxjs';
import  take  from 'rxjs/operators';
 
async function execute() 
  const source$ = interval(2000).pipe(take(10));
  const finalNumber = await lastValueFrom(source$);
  console.log(`The final number is $finalNumber`);

 
execute();
 
// Expected output:
// "The final number is 9"

旧答案:

如果您不推荐使用 toPromise,您可以使用 .pipe(take(1)).toPromise,但您可以看到 here 不推荐使用。

所以请使用toPromise (RxJs 6) 如前所述:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = sample('First Example')
  .toPromise()
  //output: 'First Example'
  .then(result => 
    console.log('From Promise:', result);
  );

异步/等待示例:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = await sample('First Example').toPromise()
// output: 'First Example'
console.log('From Promise:', result);

阅读更多here。

【讨论】:

toPromise 已弃用,并且已在 RxJS 8 中删除。您链接中的文档已过时。 Rx 是什么意思?【参考方案6】:

不推荐使用toPromise(),因为它在 RxJs 7 之后会贬值。您可以使用 RxJs 7 lastValueFrom()firstValueFrom() 中的两个新运算符。更多详情可查看here

const result = await lastValueFrom(myObservable$);

Beta 版的实现可在此处获得:

firstValueFrom lastValueFrom

【讨论】:

【参考方案7】:

我正在使用 RxJS V 6.4.0,因此我应该使用 V 7.x.x toPromise() 中已弃用的一个。受其他答案的启发,这是我对 toPromise() 所做的事情

import  first, ...  from 'rxjs/operators';

...

if (condition) 
  await observable$.pipe(first()).toPromise();


...

注意我如何在pipe() 中使用last()。因为我的observable.first() 不像macil 提到的那样工作

希望这可以帮助像我一样使用 RxJS V 6.x.x 的其他人:)。

谢谢。

【讨论】:

以上是关于如何在 Rx Observable 上“等待”?的主要内容,如果未能解决你的问题,请参考以下文章

如何声明 Rx.Observable.fromPromise() 的 TypeScript 返回类型

如何在 Swift 上同步或使 Observable 等待?

rx 的 Observable.FromEventPattern 的 TPL 等价物是啥?

从其他 Observable 获取值的 Rx Observable

Rx Java:Observable 在订阅者请求之前发出项目

Rx 操作符四