反应式编程 - Node.js 中的 RxJS 与 EventEmitter

Posted

技术标签:

【中文标题】反应式编程 - Node.js 中的 RxJS 与 EventEmitter【英文标题】:Reactive Programming - RxJS vs EventEmitter in Node.js 【发布时间】:2014-10-09 22:15:51 【问题描述】:

最近我开始研究RxJS 和 RxJava(来自 Netflix)库,它们致力于响应式编程的概念。

Node.js 在事件循环的基础上工作,它为您提供异步编程的所有工具,而后续的节点库(如“集群”)可帮助您充分利用多核机器。 Node.js 还为您提供了 EventEmitter 功能,您可以在其中订阅事件并对其进行异步操作。

另一方面,如果我理解正确的话,RxJS(以及一般的反应式编程)的工作原理是事件流,订阅事件流,异步转换事件流数据。

所以,问题是在 Node.js 中使用 Rx 包意味着什么。 Node 的事件循环、事件发射器以及对 Rx 的流和订阅的订阅有多么不同。

【问题讨论】:

我最喜欢做的就是用 Observable 包裹一个事件发射器! @richardpringle - 那么你还必须将它包装在 bacon(js) 中 【参考方案1】:

Observable 与 EventEmitters 不同。在某些情况下,它们可能表现得像 EventEmitters,即当它们使用 RxJS Subjects 进行多播时,但通常它们不像 EventEmitters。

简而言之,RxJS Subject 类似于 EventEmitter,但 RxJS Observable 是更通用的接口。 Observable 更类似于零参数的函数。

考虑以下几点:


function foo() 
  console.log('Hello');
  return 42;


var x = foo.call(); // same as foo()
console.log(x);
var y = foo.call(); // same as foo()
console.log(y);

当然,我们都希望看到输出:

"Hello"
42
"Hello"
42

您可以编写与上述相同的行为,但使用 Observables:

var foo = Rx.Observable.create(function (observer) 
  console.log('Hello');
  observer.next(42);
);

foo.subscribe(function (x) 
  console.log(x);
);
foo.subscribe(function (y) 
  console.log(y);
);

而且输出是一样的:

"Hello"
42
"Hello"
42

这是因为函数和 Observable 都是惰性计算。如果您不调用该函数,console.log('Hello') 将不会发生。同样对于 Observables,如果您不“调用”(subscribe),console.log('Hello') 将不会发生。另外,“调用”或“订阅”是一个独立的操作:两个函数调用触发两个单独的副作用,两个 Observable 订阅触发两个单独的副作用。与 EventEmitters 共享副作用并不管订阅者是否存在都急切执行相反,Observables 没有共享执行并且是惰性的。


到目前为止,函数和 Observable 的行为没有区别。这个 *** 问题可以更好地表述为“RxJS Observables 与函数?”。

有些人声称 Observables 是异步的。那不是真的。如果你用日志包围一个函数调用,像这样:

console.log('before');
console.log(foo.call());
console.log('after');

你显然会看到输出:

"before"
"Hello"
42
"after"

这与 Observables 的行为相同:

console.log('before');
foo.subscribe(function (x) 
  console.log(x);
);
console.log('after');

还有输出:

"before"
"Hello"
42
"after"

这证明foo的订阅是完全同步的,就像一个函数一样。


那么 Observable 和函数之间的真正区别是什么?

Observables 可以随着时间的推移“返回”多个值,而函数却不能。你不能这样做:

function foo() 
  console.log('Hello');
  return 42;
  return 100; // dead code. will never happen

函数只能返回一个值。然而,Observables 可以做到这一点:

var foo = Rx.Observable.create(function (observer) 
  console.log('Hello');
  observer.next(42);
  observer.next(100); // "return" another value
  observer.next(200);
);

console.log('before');
foo.subscribe(function (x) 
  console.log(x);
);
console.log('after');

同步输出:

"before"
"Hello"
42
100
200
"after"

但您也可以异步“返回”值:

var foo = Rx.Observable.create(function (observer) 
  console.log('Hello');
  observer.next(42);
  observer.next(100);
  observer.next(200);
  setTimeout(function () 
    observer.next(300);
  , 1000);
);

有输出:

"before"
"Hello"
42
100
200
"after"
300

总结一下,

func.call() 表示“立即(同步)给我一个值obsv.subscribe() 的意思是“给我一些值。也许很多,也许是同步的,也许是异步的

这就是 Observables 是函数的泛化(没有参数)。

【讨论】:

很好的解释,但与问题无关【参考方案2】:

监听器何时连接到 Emitter ?

使用事件发射器,只要有他们感兴趣的事件发生,侦听器就会收到通知。当事件发生后添加新的侦听器时,他将不知道过去的事件。新的监听器也不会知道之前发生的事件的历史。当然,我们可以手动编程我们的发射器和监听器来处理这个自定义逻辑。

使用响应式流,订阅者可以获取从一开始就发生的事件流。所以他订阅的时间并不严格。现在他可以对流进行各种操作,得到他感兴趣的事件子流。

这样的好处就出来了:

当我们需要处理随时间发生的事件时 它们发生的顺序 事件发生的模式(假设每次 Google 股票的买入事件之后,Microsoft 股票的卖出事件都会在 5 分钟内发生)

高阶流:

高阶流是“流的流”:其事件值本身就是流的流。

对于事件发射器,一种方法是将相同的侦听器附加到多个事件发射器。当我们需要关联发生在不同发射器上的事件时,它变得很复杂。

使用响应式流,这很容易。来自mostjs 的示例(这是一个响应式编程库,类似于 RxJS,但性能更高)

const firstClick = most.fromEvent('click', document).take(1);
const mousemovesAfterFirstClick = firstClick.map(() =>
    most.fromEvent('mousemove', document)
        .takeUntil(most.of().delay(5000)))

在上面的示例中,我们将点击事件与鼠标移动事件相关联。当事件作为流可用时,跨事件推断模式变得更容易完成。

话虽如此,使用 EventEmitter,我们可以通过过度设计我们的发射器和侦听器来完成这一切。它需要过度工程,因为它不首先用于此类场景。而响应式流之所以如此流畅,是因为它旨在解决此类问题。

【讨论】:

以上是关于反应式编程 - Node.js 中的 RxJS 与 EventEmitter的主要内容,如果未能解决你的问题,请参考以下文章

相当于 rxjs 中可观察到的 redux 'getState'

一起来看 rxjs

node.js和npm相关问题

Java和Node.js实战 MongoDB 4.x 新特性:Change Streams 变化流

SQL - node.js express - 反应加载行为

Node.js Streams 与 Observables