如何在 RxJS 订阅方法中等待

Posted

技术标签:

【中文标题】如何在 RxJS 订阅方法中等待【英文标题】:How to await inside RxJS subscribe method 【发布时间】:2017-10-08 10:54:38 【问题描述】:

在 RxJS 主题的订阅回调中,我想在 async 函数上使用 await。以下是打字稿转译器抱怨说的代码示例:

错误:(131, 21) TS2304: 找不到名称“等待”。

async ngOnInit() 
  this.subscriber = dateSubscription.subscribe((date: Date) => 
    let dbKey = await this._someService.saveToDatabase(someObject);
    // wait for db write to finish before evaluating the next code
    // ... some other code here
  );

通常我在尝试在非 async 函数中调用 await 时会看到这一点。我是否需要以某种方式进行订阅回调async 或者我是否会出错?函数 saveToDatabaseasync 并返回一个解析为写入的数据库主键的承诺。

【问题讨论】:

【参考方案1】:

async ngOnInit() 是不正确的签名,因为这是 Angular 定义OnInit 接口的方式。它应该返回void:

 export interface OnInit  ngOnInit(): void; 

如果您有任何承诺在dateSubscription 之后进行处理,您可以使用Observable.fromPromise 喜欢

 dateSubscription
 .flatMap(x=>
      Observable.defer(Observable.fromObservable(this._someService.saveToDatabase(someObject)))
   ).subscribe()

【讨论】:

如果您使用的是flatMapdefer,则不需要fromPromise 你错了,你绝对可以在ngOnInit上使用async 你必须实现一个由 Angular 定义的接口。您可以将 async 放在 OnInit 上,但 Angular 不会这样做并且不会等待。 Angular 会等待,Angular 没有选择。生成的 .js 将接口的实现与生成的 awaiter 一起包装起来。看this,注意右手边。 它不会等待结果gist.github.com/ipassynk/f562c773c9e0051a7dbc2e2c37baf87b【参考方案2】:

您无需使用await,也无需将Promise 转换为Observable


CF 这个来自 Ben Lesh 的Tweet:


这是一个模拟函数 saveToDatabase 的示例:(以及正在工作的 Plunkr:https://plnkr.co/edit/7SDLvRS2aTw9gYWdIznS?p=preview)

const  Observable  = Rx;

const saveToDatabase = (date) =>
  new Promise(resolve =>
    setTimeout(() =>
      resolve(`$date has been saved to the database`),
      1000));

const date$ = Observable.of(new Date()).delay(1000);

date$
  .do(x => console.log(`date received, trying to save it to database ...`))
  .switchMap(date => saveToDatabase(date))
  .do(console.log)
  .subscribe();

输出:

【讨论】:

【参考方案3】:

你不能直接await Observables,但是你可以awaitPromise

更新

.toPromise() 方法在 RxJS 8 中被弃用,因此您可以使用 lastValueFrom()firstValueFrom()

import  lastValueFrom  from 'rxjs';
this.categories = await lastValueFrom(categories$);

感谢Robert Rendell's comment。

RxJS v8 之前的版本

您可以在 observables 订阅上简单地使用 .toPromise() 方法。考虑以下几点:

async ngOnInit() 
  const date = await dateSubscription.toPromise();      
  let dbKey = await this._someService.saveToDatabase(someObject);

当您 await 时,dateSubscription 的承诺将交给您一个 Date 对象。然后您可以继续执行下一行,这使您的代码读取更加有序。

有些人认为 angular 不会等待 ngOnInit 完成,它没有选择。查看从给定的TypeScript here 生成的javascript。如您所见,ngOnInit 将调用在内部管理和执行底层状态机(生成器)的 awaiter。 Angular对此没有任何控制权。它只是希望调用该方法。

【讨论】:

toPromise 在 RxJS 8 中已被弃用,因此您可以使用 lastValueFromfirstValueFrom。在这里查看用法:import lastValueFrom from 'rxjs';this.categories = await lastValueFrom(categories$); 当我们想使用firstValueFrom 时,我们对流的完成完全不感兴趣。我们想要的是它的第一个发出的值,用它解决 Promise 并取消订阅流。取自indepth.dev/posts/1287/… 看起来它是 RxJS 7,其中 toPromise 已被弃用。【参考方案4】:

您可以直接将异步签名添加到订阅中的匿名函数调用

 this.subscriber = dateSubscription.subscribe(async (date: Date) => 
    let dbKey = await this._someService.saveToDatabase(someObject);
    // wait for db write to finish before evaluating the next code
    // ... some other code here
  );

【讨论】:

为什么会这样?可观察的实现是否在subscribe 中调用await?不适合我,希望获得更多信息.. 这个答案是错误的。您可以在其中放置一个异步函数,但它的行为方式不会像您期望的那样。例如,当您使用Subject.next() 通知可观察对象时,调用将在所有订阅都执行完毕后完成。但是,如果您在其中放置异步订阅,则对 next() 的调用将立即完成,Subject.next()不会等待异步订阅。 @MgSam 是完全正确的,这是一种难以捉摸的方法。在订阅者函数中使用await 可能会带来意想不到的副作用,因为不再保证可以逐渐处理可观察对象发出的值。 @MgSam 是对的。不要使用subscribe(async - 可以选择使用switchMap 代替:learnrxjs.io/learn-rxjs/operators/transformation/switchmap ?如果您想维护多个内部订阅,请尝试mergeMap! ? switchMap 通常被认为比mergeMap 更安全! ?switchMap可以取消正在进行的网络请求!【参考方案5】:

更新: 发现一次性承诺很容易,只需在订阅者(blog) 前使用toPromise。因此对于上述情况,它将是这样的:

const title = await this.translate.get('MYBOOK-PAGE.PAGE_TITLE').toPromise();

老办法: 这是我解决这个问题的方法

const title = await new Promise<string>(resolve => 
  this.translate.get('MYBOOK-PAGE.PAGE_TITLE')
   .subscribe(translated => 
     resolve(translated)
   ));

我正在做的是将我的 Observable 更改为 Promise

注意:这里唯一的问题是这是一次表演,即。如果你 一旦您将无法再次访问它,请订阅。很适合我分享 在这里。

【讨论】:

【参考方案6】:

也许是对此问题的更新响应。我也有类似的等待响应的需要,而不是在 Observable 实现中使用 await 或 setTimeout() 方法,现在更简洁的方法是在完成订阅之前使用 RxJS 内置的 interval() 方法。

在服务中试试这个:

import  interval  from 'rxjs';
...

// inside your method
const source = interval(100);
    source.subscribe(x => 
        subject.next(CONNECTIONS_DATA);
        subject.complete();
);
return subject;

RxJS 文档: https://rxjs-dev.firebaseapp.com/guide/subject#reference-counting

如果最近有人尝试这样做,希望对您有所帮助。

【讨论】:

【参考方案7】:

2022 年的 Rxjs:使用 pipeconcatMap

我最终在这里寻找能够在收听下一个发布项目之前等待处理昂贵项目的能力。我必须在 subscribe 处理程序中将我的代码从 async 更改为按照 this other similar SO question 使用 pipeconcatMap

旧代码,不会在处理新项目之前等待昂贵的项目调用(因此在我的情况下竞争条件比比皆是):

myObservable$.subscribe(async (item) =>  await expensiveFoo(item); )

新代码,确实在处理下一项之前等待昂贵的功能。

let subscription = myObservable$.pipe(
    concatMap(async (item) =>  await expensiveFoo(item); )
).subscribe(
    (item) => simpleFoo(item),// can also handle sync code here if you want 
    (error) => handleError(error), 
    () => handleComplete()
);

在完成 observable 之前,请务必保留对 subscription(它不是订阅者)的引用。

【讨论】:

以上是关于如何在 RxJS 订阅方法中等待的主要内容,如果未能解决你的问题,请参考以下文章

如何在订阅前等待 Observable 完成

如何在 rxjs 中的订阅中重构订阅

RxJS 等待订阅 Observable 完成

如何在没有嵌套订阅的情况下使用 rxjs 处理内部订阅数组

如何在 RxJS 中“等待”两个 observables

如何在 Angular 应用程序中检测与 rxjs 相关的内存泄漏