Node.js Streams 与 Observables

Posted

技术标签:

【中文标题】Node.js Streams 与 Observables【英文标题】:Node.js Streams vs. Observables 【发布时间】:2015-08-06 01:16:54 【问题描述】:

在了解Observables之后,我发现它们与Node.js streams非常相似。两者都具有在新数据到达、发生错误或没有更多数据 (EOF) 时通知消费者的机制。

我很想了解两者之间的概念/功能差异。谢谢!

【问题讨论】:

@BenjaminGruenbaum 我想知道你为什么用 rxjs 和 bacon 标记这个? OP 似乎指的是来自ecmascript-harmony 的观察值 @Bergi 关于 OP 和问题的先验知识。基本上。 大声恭喜获得赞成票,但我不知道为什么这个问题没有结束。这是一个真正的问题/如何适合 SO。 @AlexanderMills 这对 SO 来说不是一个合适的问题吗?这不是一个“你最喜欢哪个”的问题。它询问 JS/Node 中两种常用的响应式模式之间的差异。 【参考方案1】:

Observables 和 node.js 的 Streams 都允许您解决相同的基本问题:异步处理一系列值。我认为,两者之间的主要区别与促使其出现的背景有关。该上下文反映在术语和 API 中。

Observables 方面,您有一个 EcmaScript 扩展,它引入了反应式编程模型。它试图用ObserverObservable 的简约和可组合的概念来填补价值生成和异步之间的空白。

在 node.js 和 Streams 方面,您希望创建一个接口,用于网络流和本地文件的异步和高性能处理。术语源自该初始上下文,您会得到pipechunkencodingflushDuplexBuffer 等。通过采用为特定用例提供明确支持的实用方法,您失去一些组合事物的能力,因为它不那么统一。例如,您在Readable 流上使用push,在Writable 上使用write,尽管从概念上讲,您在做同样的事情:发布一个值。

因此,在实践中,如果您查看概念并使用选项 objectMode: true ,则可以将ObservableReadable 流匹配,将ObserverWritable 流匹配。您甚至可以在两个模型之间创建一些简单的适配器。

var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');

var Observable = function(subscriber) 
    this.subscribe = subscriber;


var Subscription = function(unsubscribe) 
    this.unsubscribe = unsubscribe;


Observable.fromReadable = function(readable) 
    return new Observable(function(observer) 
        function nop() ;

        var nextFn = observer.next ? observer.next.bind(observer) : nop;
        var returnFn = observer.return ? observer.return.bind(observer) : nop;
        var throwFn = observer.throw ? observer.throw.bind(observer) : nop;

        readable.on('data', nextFn);
        readable.on('end', returnFn);
        readable.on('error', throwFn);

        return new Subscription(function() 
            readable.removeListener('data', nextFn);
            readable.removeListener('end', returnFn);
            readable.removeListener('error', throwFn);
        );
    );


var Observer = function(handlers) 
    function nop() ;

    this.next = handlers.next || nop;
    this.return = handlers.return || nop;
    this.throw = handlers.throw || nop;


Observer.fromWritable = function(writable, shouldEnd, throwFn) 
    return new Observer(
        next: writable.write.bind(writable), 
        return: shouldEnd ? writable.end.bind(writable) : function() , 
        throw: throwFn
    );

您可能已经注意到我更改了一些名称并使用了此处介绍的ObserverSubscription 的更简单概念,以避免Generator 中的Observables 完成的职责超载.基本上,Subscription 允许您取消订阅Observable。无论如何,使用上面的代码你可以有一个pipe

Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));

process.stdin.pipe(process.stdout) 相比,您所拥有的是一种组合、过滤和转换流的方法,该方法也适用于任何其他数据序列。您可以使用ReadableTransformWritable 流来实现它,但API 更喜欢子类化而不是链接Readables 和应用函数。例如,在Observable 模型上,转换值对应于将转换器函数应用于流。它不需要Transform 的新子类型。

Observable.just = function(/*... arguments*/) 
    var values = arguments;
    return new Observable(function(observer) 
        [].forEach.call(values, function(value) 
            observer.next(value);
        );
        observer.return();
        return new Subscription(function() );
    );
;

Observable.prototype.transform = function(transformer) 
    var source = this;
    return new Observable(function(observer) 
        return source.subscribe(
            next: function(v) 
                observer.next(transformer(v));
            ,
            return: observer.return.bind(observer),
            throw: observer.throw.bind(observer)
        );
    );
;

Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
  .subscribe(Observer.fromWritable(process.stdout))

结论?很容易在任何地方引入响应式模型和Observable 概念。围绕该概念实现整个库更加困难。所有这些小功能都需要始终如一地协同工作。毕竟,ReactiveX 项目仍在进行中。但是如果你真的需要将文件内容发送到客户端,处理编码,然后压缩它,那么它就在那里,在 NodeJS 中,它工作得很好。

【讨论】:

我真的不确定这整个“对 Ecmascript 的扩展”。 RxJS 只是一个库,与 RxJava 等一样。最终,在 ES7 或 ES8 中,ES/JS 中可能会有一些与 Observables 相关的关键字,但它们肯定不是语言的一部分,当你回答问题时肯定不是2015 年。 RX 实现是否支持无损背压?例如,如果 nodejs 在暂停模式下读取流,那么我们可以使用read() 方法按需从流中读取。而drain event 可以表示可写流可以接收更多数据。

以上是关于Node.js Streams 与 Observables的主要内容,如果未能解决你的问题,请参考以下文章

Java和Node.js实战 MongoDB 4.x 新特性:Change Streams 变化流

一篇入门reactive streams背压响应流编程

一篇入门reactive streams背压响应流编程

一篇入门reactive streams背压响应流编程

一篇入门reactive streams背压响应流编程

拓展阅读|理解Node.js事件驱动架构