Rxjs 可观察到承诺永远不会在 nodejs 中解决
Posted
技术标签:
【中文标题】Rxjs 可观察到承诺永远不会在 nodejs 中解决【英文标题】:Rxjs observable to promise never resolved in nodejs 【发布时间】:2022-01-18 16:12:33 【问题描述】:我制作了一个我正在工作的代码的简化示例。
import * as rx from "rxjs";
import * as op from "rxjs/operators";
async function main(): Promise<void>
const blocker = new rx.ReplaySubject<0>();
const subscription = rx.timer(0, 1000)
.pipe(
op.take(3),
op.observeOn(rx.queueScheduler),
op.subscribeOn(rx.queueScheduler)
)
.subscribe(
next: x => console.log(`timer: next: [$x]`),
error: err => console.log(`timer: error: [$err]`),
complete: () =>
console.log("timer: complete");
blocker.next(0);
);
const promise = rx.lastValueFrom(blocker.asObservable()
.pipe(
op.single(),
op.observeOn(rx.queueScheduler),
op.subscribeOn(rx.queueScheduler)
));
console.log("prepared to await");
await promise;
console.log("awaited!");
subscription.unsubscribe();
main()
.then(
() => console.log("all right"),
reason => console.log(`rejected: [$reason]`))
.catch(err => console.log(`error! : $err`))
.finally(() => console.log("done done done"));
它工作(排序)除了“等待!”时的部分!永远不会打印到控制台,以及main
函数返回的承诺之后的任何行。
实际的控制台输出是:
prepared to await
timer: next: [0]
timer: next: [1]
timer: next: [2]
timer: complete
我期望的是:
prepared to await
timer: next: [0]
timer: next: [1]
timer: next: [2]
timer: complete
awaited!
all right
问题:
-
为什么会这样?这里涉及的nodejs“魔法”(我假设是调度程序)是什么?你能推荐任何关于nodejs内部的书籍吗?
如何更改代码以实现预期输出?
谢谢。
【问题讨论】:
【参考方案1】:我认为这与调度程序或 nodejs 无关。 lastValueFrom
要求它所处理的 observable 在它解析返回的 Promise 之前完成。在这种情况下,它永远不会完成。
一个解决方案
试试这个:
blocker.next(0);
blocker.complete();
然后你应该得到你预期的输出。否则阻塞器永远不会完成,这意味着它永远不会发出最后一个值。
另一种解决方案
替换
single()
与
take(1)
然后,即使阻止程序本身没有完成,你交给lastValueFrom
的 observable 也会完成。这将解决您的承诺。
另一种解决方案
试试这个:
blocker.next(0);
blocker.next(1);
然后你应该得到你预期的输出。虽然这一次将是 single
用 SequenceError
解析您的承诺(而不是前面的示例,其中承诺使用值 0
解析)。
【讨论】:
完全有道理,谢谢!以上是关于Rxjs 可观察到承诺永远不会在 nodejs 中解决的主要内容,如果未能解决你的问题,请参考以下文章
Angular2 beta.12 和 RxJs 5 beta.3 的可观察到的错误