事件 vs 流 vs Observables vs 异步迭代器
Posted
技术标签:
【中文标题】事件 vs 流 vs Observables vs 异步迭代器【英文标题】:Events vs Streams vs Observables vs Async Iterators 【发布时间】:2017-01-19 06:17:18 【问题描述】:目前,在 javascript 中处理一系列异步结果的唯一稳定方法是使用事件系统。但是,正在开发三种替代方案:
流:https://streams.spec.whatwg.org 可观察:https://tc39.github.io/proposal-observable 异步迭代器:https://tc39.github.io/proposal-async-iteration
每个事件和其他事件的区别和好处是什么?
这些是否打算替换事件?
【问题讨论】:
顺便说一句,仔细看看这篇文章:A General Theory of Reactivity 很难想象一个更好的例子来说明一个引人入胜、有用的问题,但根据 SO 的荒谬、狭窄的规则,应该以“过于宽泛”或“观点问题”来结束。跨度> 【参考方案1】:我对异步迭代器的理解有点有限,但据我了解,WHATWG Streams 是异步迭代器的一个特例。有关这方面的更多信息,请参阅Streams API FAQ。它简要说明了differs from Observables.
Async Iterators 和 Observables 都是操作多个异步值的通用方法。目前它们不互操作,但似乎正在考虑创建 Observables from Async Iterators。 Observables 的基于 push 的性质更像是当前的事件系统,AsyncIterables 是基于 pull 的。一个简化的视图是:
-------------------------------------------------------------------------
| | Singular | Plural |
-------------------------------------------------------------------------
| Spatial (pull based) | Value | Iterable<Value> |
-------------------------------------------------------------------------
| Temporal (push based) | Promise<Value> | Observable<Value> |
-------------------------------------------------------------------------
| Temporal (pull based) | await on Promise | await on Iterable<Promise> |
-------------------------------------------------------------------------
我将AsyncIterables
表示为Iterable<Promise>
,以使类比更容易推理。请注意,await Iterable<Promise>
没有意义,因为它应该在for await...of AsyncIterator
循环中使用。
你可以找到更完整的解释Kriskowal: A General Theory of Reactivity。
【讨论】:
我觉得你的回答有助于高层次的比较,但我不同意AsyncIterables
是Iterable<Promise>
的说法。 Iterable<Promise>
是一个 同步 可迭代的 Promise,并且没有背压的概念。您可以随心所欲地使用它,没问题。 AsyncIterables
有背压,这意味着在前一次迭代结束之前在迭代器上调用 next()
是非法的。它产生一个Promise< value, done >
,它不像promise的同步迭代器那样产生一个 Promise<value>, done
。
啊,有趣的区别。我以前没有考虑过这个。我想知道应该如何处理再次调用 next 。回报同样的承诺?抛出错误?
由于 Observables 是基于推送的,因此它们很容易不断地从 AsyncIterator 中拉取并尽快发出。【参考方案2】:
这里的 API 大致分为两类:拉取和推送。
拉动
异步拉取 API 非常适合从源中提取数据的情况。这个源可能是一个文件,或者一个网络套接字,或者一个目录列表,或者其他任何东西。关键是在被询问时从源中提取或生成数据。
异步迭代器是这里的基本原语,意味着是基于拉取的异步源概念的通用表现形式。在这样的来源中,您:
通过执行const promise = ai.next()
从异步迭代器中拉取
使用const result = await promise
(或使用.then()
)等待结果
检查结果以确定它是异常(抛出)、中间值( value, done: false )
,还是完成信号( value: undefined, done: true
)。
这类似于同步迭代器是基于拉取同步值源概念的一般表现形式。同步迭代器的步骤与上面完全相同,省略了“等待结果”步骤。
可读流是异步迭代器的一种特殊情况,旨在专门封装 I/O 源,如套接字/文件/等。他们有专门的 API 用于将它们传送到可写流(代表 I/O 生态系统的另一半,接收器)并处理由此产生的背压。它们还可以专门用于以有效的“自带缓冲区”方式处理字节。这有点让人想起数组是同步迭代器的一种特殊情况,针对 O(1) 索引访问进行了优化。
pull API 的另一个特点是它们通常是单一消费者。谁提取了值,现在就拥有它,并且它不存在于源异步迭代器/流/等中。了。已经被消费者拉走了。
一般来说,拉取 API 提供了一个与一些底层数据源通信的接口,允许消费者表达对它的兴趣。这与……形成鲜明对比。
推
Push API 非常适合生成数据的情况,而生成的数据并不关心是否有人想要它。例如,不管有人是否感兴趣,你的鼠标移动了,然后你点击了某个地方,这仍然是真的。您希望使用推送 API 来显示这些事实。然后,消费者(可能是多个消费者)可能会订阅,以收到有关此类事情发生的推送通知。
API 本身并不关心订阅者是零个、一个还是多个。它只是展示了一个关于宇宙中发生的事情的事实。
事件是这一点的简单体现。您可以在浏览器中订阅 EventTarget,或在 Node.js 中订阅 EventEmitter,并在已调度的事件时收到通知。 (通常,但不总是,由 EventTarget 的创建者。)
Observables 是 EventTarget 的一个更精致的版本。他们的主要创新是订阅本身由一个一流的对象 Observable 表示,然后您可以在其上应用组合器(例如过滤器、映射等)。他们还选择将三个信号(通常命名为 next、complete 和 error)捆绑在一起,并赋予这些信号特殊的语义,以便组合器尊重它们。这与 EventTarget 不同,EventTarget 中的事件名称没有特殊语义(EventTarget 的任何方法都不会关心您的事件是被命名为“完成”还是“asdf”)。 Node 中的 EventEmitter 具有这种特殊语义方法的某些版本,其中“错误”事件可能会使进程崩溃,但这是相当原始的。
observables 对事件的另一个很好的特性是,通常只有 observable 的创建者可以使它生成那些 next/error/complete 信号。而在 EventTarget 上,任何人都可以调用 dispatchEvent()。根据我的经验,这种职责分离有助于编写更好的代码。
但归根结底,事件和可观察对象都是很好的 API,可以将事件推送到世界上,让订阅者可以随时收听和收听。我想说 observables 是更现代的方式来做到这一点,并且在某些方面更好,但事件更广泛和更容易理解。因此,如果有任何东西旨在替换事件,那它就是 observables。
推 拉
值得注意的是,您可以在紧要关头构建任何一种方法:
要在 pull 之上构建 push,请不断从 pull API 拉取,然后将块推送给任何消费者。 要在 push 之上构建 pull,请立即订阅 push API,创建一个累积所有结果的缓冲区,当有人拉取时,从该缓冲区中抓取它。 (或者等到缓冲区变为非空,如果您的消费者拉动的速度快于包装的推送 API 的推送速度。)后者通常比前者要编写更多的代码。
尝试在两者之间进行调整的另一个方面是,只有拉式 API 才能轻松传达背压。您可以添加一个侧通道来推送 API,以允许它们将背压传回源;我认为 Dart 做到了这一点,有些人试图创造具有这种能力的可观察物的演变。但这比一开始就正确选择拉式 API 更尴尬。不利的一面是,如果您使用推送 API 来公开基本上基于拉的源,您将无法传达背压。顺便说一下,这是使用 WebSocket 和 XMLHttpRequest API 所犯的错误。
一般来说,我发现试图通过包装其他 API 来将所有内容统一到一个 API 中是错误的。 Push 和 pull 有不同的,不是很重叠的区域,它们每个都运行良好,并且说我们应该选择你提到的四个 API 中的一个并像某些人那样坚持使用它是短视的,会导致笨拙的代码。
【讨论】:
您能否详细说明背压的含义? Daniel:例如参见 www.reactivestreams.org。 @Domenic "顺便说一句,这是使用 XMLHttpRequest API 犯的错误",您能否描述得更详细一些,谢谢! 因为它使用事件向您推送数据,而不是等待您读取一大块数据。因此它没有背压的概念,因为它不知道您消耗数据的速度。 优秀的答案 Domenic - 您可能想要添加一些来自 gtor 的示例或类似资源的拉/推示例。值得一提的是,Node 目前打算与异步迭代器(但不是 observables)进行互操作——因为这些在规范中更进一步。以上是关于事件 vs 流 vs Observables vs 异步迭代器的主要内容,如果未能解决你的问题,请参考以下文章
消息总线 vs. 服务总线 vs. 事件中心 vs. 事件网格
Rxjs:Observable.combineLatest vs Observable.forkJoin