RxJS 等待订阅 Observable 完成

Posted

技术标签:

【中文标题】RxJS 等待订阅 Observable 完成【英文标题】:RxJS Waits until Subscribe Observable Finish 【发布时间】:2021-11-16 14:15:51 【问题描述】:

我正在尝试使用 NestJS 实现 gRPC,并使用 RxJS Observerable 来处理服务器流。

在下面的代码中,我尝试将值从 observable 放入 results 数组。函数findAllRepos 返回空列表subscribe 中的console.log(value) 正确打印所有结果

  findAllRepos(): Repository[] 
    const results: Repository[] = [];

    const observable = this.mainService.findAllRepos();
    observable.subscribe(
      next: (value) => 
        console.log(value);
        results.push(value);
      ,
      error: (err) => console.log(err),
      complete: () => console.log(results)
    );

    return results;
  

我认为问题在于函数在subscribe 完成之前返回值。有什么解决方案可以解决这个问题吗?谢谢!

【问题讨论】:

看看这个:***.com/questions/38291783/… 您需要返回 observable 并在需要响应的地方订阅。您不能像您尝试的那样从对象同步返回值。有关异步数据的更多信息,请参见此处:***.com/q/14220321/6513921 这能回答你的问题吗? How to return the response from an asynchronous call 【参考方案1】:

问题

我认为问题在于函数在订阅完成之前返回值。

这些函数(nexterror、&complete)都将在未来某个时间被可观察的(不是你)调用。当您深入了解 RxJS 的核心内容时,您可以对这些函数的调用方式进行一些控制,但为了简单起见,最好想象一下这是由 RxJS 库不透明地处理的。

因此,没有办法做你想做的事。

问题比这更糟糕。您遇到的问题存在于同步和异步代码之间的任何交互。您不能同步运行异步代码。在单线程环境(如 javascript)中,试图让一段同步的代码等待会立即使整个程序死锁。

考虑以下代码:您可能希望它输出 "a equals 0" 1000 毫秒,然后开始输出 "a equals 1" 永远。然而,实际发生的情况是,setTimeout 中的代码将永远没有机会运行,因为线程将陷入无限循环打印 "a equals 0"

// Start with a = 0
let a = 0;
// After 1 second, set a = 1
setTimeout(() => a = 1, 1000);

// loop forever and print something based on value of a
while(true) 
  if(a == 0) console.log("a equals 0");
  else console.log("a equals 1");

解决方案

管理异步代码的两种最流行的方法是通过 Promise 或 observables。如果你想让一个函数异步返回一些东西,那么让它返回一个 promise 或 observable。

在你的情况下:

findAllRepos(): Observable<Repository[]> 
  
  const observable = this.mainService.findAllRepos();
  return observable.pipe(
    tap(value => console.log("Repository value being added to array: ", value)),
    toArray(),
    tap(
      next: value => console.log("Result Array (Repository[]) : ", value),
      error: console.log,
      complete: () => console.log("findAllRepos observable complete")
    )
  );


然后在别处获取实际值:

findAllRepos().subscribe(repositories => 
  /* Do something with your array of repositories */
);

【讨论】:

您的解决方案的一大重要优势:您可以处理订阅。目前,每次调用 findAllRepos 时都会完成一个新的订阅。如果用户没有取消订阅或 observable 完成,则会造成内存泄漏。【参考方案2】:

拥有带有订阅的方法不是很好的做法,您将使响应式代码完全不响应式。为什么不使用类似的东西

findAllRepos(): Observable<Repository[]> 
    return this.mainService.findAllRepos().pipe(catchError((e) => 
        console.log('e ', e);
    ), finalize(() => 
        console.log('completed');
    ))

然后findAllRepos().subscribe 在客户端?您将在此方法中拥有更大的灵活性,您可以从多个位置调用此方法并根据用例映射您想要的结果。

【讨论】:

以上是关于RxJS 等待订阅 Observable 完成的主要内容,如果未能解决你的问题,请参考以下文章

RxJS - 只订阅一次但不完成 Observable

取消订阅 Rxjs finalize 操作符

任何需要先取消订阅 RxJS()

RxJS 5,将 observable 转换为 BehaviorSubject(?)

Rxjs:将中间订阅和完整的可观察对象合并,并整体完成

Rxjs 如何知道 observable 有多少订阅者?