如何在 RxJS 订阅方法中等待
Posted
技术标签:
【中文标题】如何在 RxJS 订阅方法中等待【英文标题】:How to await inside RxJS subscribe method 【发布时间】:2017-10-08 10:54:38 【问题描述】:在 RxJS 主题的订阅回调中,我想在 async
函数上使用 await
。以下是打字稿转译器抱怨说的代码示例:
错误:(131, 21) TS2304: 找不到名称“等待”。
async ngOnInit()
this.subscriber = dateSubscription.subscribe((date: Date) =>
let dbKey = await this._someService.saveToDatabase(someObject);
// wait for db write to finish before evaluating the next code
// ... some other code here
);
通常我在尝试在非 async
函数中调用 await 时会看到这一点。我是否需要以某种方式进行订阅回调async
或者我是否会出错?函数 saveToDatabase
是 async
并返回一个解析为写入的数据库主键的承诺。
【问题讨论】:
【参考方案1】:async ngOnInit()
是不正确的签名,因为这是 Angular 定义OnInit
接口的方式。它应该返回void
:
export interface OnInit ngOnInit(): void;
如果您有任何承诺在dateSubscription
之后进行处理,您可以使用Observable.fromPromise
喜欢
dateSubscription
.flatMap(x=>
Observable.defer(Observable.fromObservable(this._someService.saveToDatabase(someObject)))
).subscribe()
【讨论】:
如果您使用的是flatMap
或defer
,则不需要fromPromise
。
你错了,你绝对可以在ngOnInit
上使用async
。
你必须实现一个由 Angular 定义的接口。您可以将 async 放在 OnInit 上,但 Angular 不会这样做并且不会等待。
Angular 会等待,Angular 没有选择。生成的 .js
将接口的实现与生成的 awaiter 一起包装起来。看this,注意右手边。
它不会等待结果gist.github.com/ipassynk/f562c773c9e0051a7dbc2e2c37baf87b【参考方案2】:
您无需使用await
,也无需将Promise
转换为Observable
。
CF 这个来自 Ben Lesh 的Tweet:
这是一个模拟函数 saveToDatabase
的示例:(以及正在工作的 Plunkr:https://plnkr.co/edit/7SDLvRS2aTw9gYWdIznS?p=preview)
const Observable = Rx;
const saveToDatabase = (date) =>
new Promise(resolve =>
setTimeout(() =>
resolve(`$date has been saved to the database`),
1000));
const date$ = Observable.of(new Date()).delay(1000);
date$
.do(x => console.log(`date received, trying to save it to database ...`))
.switchMap(date => saveToDatabase(date))
.do(console.log)
.subscribe();
输出:
【讨论】:
【参考方案3】:你不能直接await
Observables,但是你可以await
Promise
。
更新
.toPromise()
方法在 RxJS 8 中被弃用,因此您可以使用 lastValueFrom()
或 firstValueFrom()
。
import lastValueFrom from 'rxjs';
this.categories = await lastValueFrom(categories$);
感谢Robert Rendell's comment。
RxJS v8 之前的版本
您可以在 observables 订阅上简单地使用 .toPromise()
方法。考虑以下几点:
async ngOnInit()
const date = await dateSubscription.toPromise();
let dbKey = await this._someService.saveToDatabase(someObject);
当您 await
时,dateSubscription
的承诺将交给您一个 Date
对象。然后您可以继续执行下一行,这使您的代码读取更加有序。
有些人认为 angular 不会等待 ngOnInit
完成,它没有选择。查看从给定的TypeScript
here 生成的javascript
。如您所见,ngOnInit
将调用在内部管理和执行底层状态机(生成器)的 awaiter。 Angular对此没有任何控制权。它只是希望调用该方法。
【讨论】:
toPromise 在 RxJS 8 中已被弃用,因此您可以使用lastValueFrom
或 firstValueFrom
。在这里查看用法:import lastValueFrom from 'rxjs';
this.categories = await lastValueFrom(categories$);
当我们想使用firstValueFrom
时,我们对流的完成完全不感兴趣。我们想要的是它的第一个发出的值,用它解决 Promise 并取消订阅流。取自indepth.dev/posts/1287/…
看起来它是 RxJS 7,其中 toPromise 已被弃用。【参考方案4】:
您可以直接将异步签名添加到订阅中的匿名函数调用
this.subscriber = dateSubscription.subscribe(async (date: Date) =>
let dbKey = await this._someService.saveToDatabase(someObject);
// wait for db write to finish before evaluating the next code
// ... some other code here
);
【讨论】:
为什么会这样?可观察的实现是否在subscribe
中调用await
?不适合我,希望获得更多信息..
这个答案是错误的。您可以在其中放置一个异步函数,但它的行为方式不会像您期望的那样。例如,当您使用Subject.next()
通知可观察对象时,调用将在所有订阅都执行完毕后完成。但是,如果您在其中放置异步订阅,则对 next()
的调用将立即完成,Subject.next()
将不会等待异步订阅。
@MgSam 是完全正确的,这是一种难以捉摸的方法。在订阅者函数中使用await
可能会带来意想不到的副作用,因为不再保证可以逐渐处理可观察对象发出的值。
@MgSam 是对的。不要使用subscribe(async
- 可以选择使用switchMap
代替:learnrxjs.io/learn-rxjs/operators/transformation/switchmap ?如果您想维护多个内部订阅,请尝试mergeMap
! ? switchMap
通常被认为比mergeMap
更安全! ?switchMap
可以取消正在进行的网络请求!【参考方案5】:
更新:
发现一次性承诺很容易,只需在订阅者(blog) 前使用toPromise
。因此对于上述情况,它将是这样的:
const title = await this.translate.get('MYBOOK-PAGE.PAGE_TITLE').toPromise();
老办法: 这是我解决这个问题的方法
const title = await new Promise<string>(resolve =>
this.translate.get('MYBOOK-PAGE.PAGE_TITLE')
.subscribe(translated =>
resolve(translated)
));
我正在做的是将我的 Observable 更改为 Promise
注意:这里唯一的问题是这是一次表演,即。如果你 一旦您将无法再次访问它,请订阅。很适合我分享 在这里。
【讨论】:
【参考方案6】:也许是对此问题的更新响应。我也有类似的等待响应的需要,而不是在 Observable 实现中使用 await
或 setTimeout() 方法,现在更简洁的方法是在完成订阅之前使用 RxJS 内置的 interval()
方法。
在服务中试试这个:
import interval from 'rxjs';
...
// inside your method
const source = interval(100);
source.subscribe(x =>
subject.next(CONNECTIONS_DATA);
subject.complete();
);
return subject;
RxJS 文档: https://rxjs-dev.firebaseapp.com/guide/subject#reference-counting
如果最近有人尝试这样做,希望对您有所帮助。
【讨论】:
【参考方案7】:2022 年的 Rxjs:使用 pipe
和 concatMap
我最终在这里寻找能够在收听下一个发布项目之前等待处理昂贵项目的能力。我必须在 subscribe
处理程序中将我的代码从 async
更改为按照 this other similar SO question 使用 pipe
和 concatMap
。
旧代码,不会在处理新项目之前等待昂贵的项目调用(因此在我的情况下竞争条件比比皆是):
myObservable$.subscribe(async (item) => await expensiveFoo(item); )
新代码,确实在处理下一项之前等待昂贵的功能。
let subscription = myObservable$.pipe(
concatMap(async (item) => await expensiveFoo(item); )
).subscribe(
(item) => simpleFoo(item),// can also handle sync code here if you want
(error) => handleError(error),
() => handleComplete()
);
在完成 observable 之前,请务必保留对 subscription
(它不是订阅者)的引用。
【讨论】:
以上是关于如何在 RxJS 订阅方法中等待的主要内容,如果未能解决你的问题,请参考以下文章