深入理解 Node.js Stream 内部机制

Posted 淘系前端团队

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解 Node.js Stream 内部机制相关的知识,希望对你有一定的参考价值。

相信很多人对 Node.js 的 Stream 已经不陌生了,不论是请求流、响应流、文件流还是 socket 流,这些流的底层都是使用 stream 模块封装的,甚至我们平时用的最多的 console.log 打印日志也使用了它,不信你打开 Node.js runtime 的源码,看看 lib/console.js

 
   
   
 
  1. function write(ignoreErrors, stream, string, errorhandler) {

  2.  // ...

  3.  stream.once('error', noop);

  4.  stream.write(string, errorhandler);

  5.  //...

  6. }

  7. Console.prototype.log = function log(...args) {

  8.  write(this._ignoreErrors,

  9.        this._stdout,

  10.        `${util.format.apply(null, args)}\n`,

  11.        this._stdoutErrorHandler);

  12. };

Stream 模块做了很多事情,了解了 Stream,那么 Node.js 中其他很多模块理解起来就顺畅多了。

stream 模块

如果你了解 生产者和消费者问题 的解法,那理解 stream 就基本没有压力了,它不仅仅是资料的起点和落点,还包含了一系列状态控制,可以说一个 stream 就是一个状态管理单元。了解内部机制的最佳方式除了看 Node.js 官方文档,还可以去看看 Node.js 的 源码:

  • lib/module.js

  • lib/_stream_readable.js

  • lib/_stream_writable.js

  • lib/_stream_tranform.js

  • lib/_stream_duplex.js

ReadableWritable 看明白,Tranform 和 Duplex 就不难理解了。

Readable Stream

Readable Stream 存在两种模式,一种是叫做 FlowingMode,流动模式,在 Stream 上绑定 ondata 方法就会自动触发这个模式,比如:

 
   
   
 
  1. const readable = getReadableStreamSomehow();

  2. readable.on('data', (chunk) => {

  3.  console.log(`Received ${chunk.length} bytes of data.`);

  4. });

这个模式的流程图如下:

深入理解 Node.js Stream 内部机制

资源的数据流并不是直接流向消费者,而是先 push 到缓存池,缓存池有一个水位标记 highWatermark,超过这个标记阈值,push 的时候会返回 false,什么场景下会出现这种情况呢?

  • 消费者主动执行了 .pause()

  • 消费速度比数据 push 到缓存池的生产速度慢

有个专有名词来形成这种情况,叫做「背压」,Writable Stream 也存在类似的情况。

流动模式,这个名词还是很形象的,缓存池就像一个水桶,消费者通过管口接水,同时,资源池就像一个水泵,不断地往水桶中泵水,而 highWaterMark 是水桶的浮标,达到阈值就停止蓄水。 下面是一个简单的 Demo:

 
   
   
 
  1. const Readable = require('stream').Readable;

  2. // Stream 实现

  3. class MyReadable extends Readable {

  4.  constructor(dataSource, options) {

  5.    super(options);

  6.    this.dataSource = dataSource;

  7.  }

  8.  // 继承了 Readable 的类必须实现这个函数

  9.  // 触发系统底层对流的读取

  10.  _read() {

  11.    const data = this.dataSource.makeData();

  12.    this.push(data);

  13.  }

  14. }

  15. // 模拟资源池

  16. const dataSource = {

  17.  data: new Array(10).fill('-'),

  18.  // 每次读取时 pop 一个数据

  19.  makeData() {

  20.    if (!dataSource.data.length) return null;

  21.    return dataSource.data.pop();

  22.  }

  23. };

  24. const myReadable = new MyReadable(dataSource);

  25. myReadable.setEncoding('utf8');

  26. myReadable.on('data', (chunk) => {

  27.  console.log(chunk);

  28. });

另外一种模式是 Non-FlowingMode,没流动,也就是暂停模式,这是 Stream 的预设模式,Stream 实例的 _readableState.flow 有三个状态,分别是:

  • _readableState.flow=null,暂时没有消费者过来

  • _readableState.flow=false,主动触发了 .pause()

  • _readableState.flow=true,流动模式

当我们监听了 onreadable 事件后,会进入这种模式,比如:

 
   
   
 
  1. const myReadable = new MyReadable(dataSource);

  2. myReadable.setEncoding('utf8');

  3. myReadable.on('readable', () => {});

监听 readable 的回调函数第一个参数不会传递内容,需要我们通过 myReadable.read() 主动读取,为啥呢,可以看看下面这张图:

深入理解 Node.js Stream 内部机制

资源池会不断地往缓存池输送数据,直到 highWaterMark 阈值,消费者监听了 readable 事件并不会消费数据,需要主动调用 .read([size]) 函数才会从缓存池取出,并且可以带上 size 参数,用多少就取多少:

 
   
   
 
  1. const myReadable = new MyReadable(dataSource);

  2. myReadable.setEncoding('utf8');

  3. myReadable.on('readable', () => {

  4.  let chunk;

  5.  while (null !== (chunk = myReadable.read())) {

  6.    console.log(`Received ${chunk.length} bytes of data.`);

  7.  }

  8. });

这里需要注意一点,只要数据达到缓存池都会触发一次 readable 事件,有可能出现「消费者正在消费数据的时候,又触发了一次 readable 事件,那么下次回调中 read 到的数据可能为空」的情况。我们可以通过 _readableState.buffer 来查看缓存池到底缓存了多少资源:

 
   
   
 
  1. let once = false;

  2. myReadable.on('readable', (chunk) => {

  3.  console.log(myReadable._readableState.buffer.length);

  4.  if (once) return;

  5.  once = true;

  6.  console.log(myReadable.read());

  7. });

上面的代码我们只消费一次缓存池的数据,那么在消费后,缓存池又收到了一次资源池的 push 操作,此时还会触发一次 readable 事件,我们可以看看这次存了多大的 buffer。

需要注意的是,buffer 大小也是有上限的,默认设置为 16kb,也就是 16384 个字节长度,它最大可设置为 8Mb,没记错的话,这个值好像是 Node 的 new space memory 的大小。

上面介绍了 Readable Stream 大概的机制,还有很多细节部分没有提到,比如 FlowingMode 在不同 Node 版本中的 Stream 实现不太一样,实际上,它有三个版本,上面提到的是第 2 和 第 3 个版本的实现;再比如 MixinsMode 模式,一般我们只推荐(允许)使用 ondata 和 onreadable 的一种来处理 Readable Stream,但是如果要求在 Non-FlowingMode 的情况下使用 ondata 如何实现呢?那么就可以考虑 MixinsMode 了。

Writable Stream

原理与 Readable Stream 是比较相似的,数据流过来的时候,会直接写入到资源池,当写入速度比较缓慢或者写入暂停时,数据流会进入队列池缓存起来,如下图所示:

深入理解 Node.js Stream 内部机制

当生产者写入速度过快,把队列池装满了之后,就会出现「背压」,这个时候是需要告诉生产者暂停生产的,当队列释放之后,Writable Stream 会给生产者发送一个 drain 消息,让它恢复生产。下面是一个写入一百万条数据的 Demo:

 
   
   
 
  1. function writeOneMillionTimes(writer, data, encoding, callback) {

  2.  let i = 10000;

  3.  write();

  4.  function write() {

  5.    let ok = true;

  6.    while(i-- > 0 && ok) {

  7.      // 写入结束时回调

  8.      ok = writer.write(data, encoding, i === 0 ? callback : null);

  9.    }

  10.    if (i > 0) {

  11.      // 这里提前停下了,'drain' 事件触发后才可以继续写入  

  12.      console.log('drain', i);

  13.      writer.once('drain', write);

  14.    }

  15.  }

  16. }

我们构造一个 Writable Stream,在写入到资源池的时候,我们稍作处理,让它效率低一点:

 
   
   
 
  1. const Writable = require('stream').Writable;

  2. const writer = new Writable({

  3.  write(chunk, encoding, callback) {

  4.    // 比 process.nextTick() 稍慢

  5.    setTimeout(() => {

  6.      callback && callback();

  7.    });

  8.  }

  9. });

  10. writeOneMillionTimes(writer, 'simple', 'utf8', () => {

  11.  console.log('end');

  12. });

最后执行的结果是:

 
   
   
 
  1. drain 7268

  2. drain 4536

  3. drain 1804

  4. end

说明程序遇到了三次「背压」,如果我们没有在上面绑定 writer.once('drain'),那么最后的结果就是 Stream 将第一次获取的数据消耗完变结束了程序。

pipe

了解了 Readable 和 Writable,pipe 这个常用的函数应该就很好理解了,

 
   
   
 
  1. readable.pipe(writable);

这句代码的语意性很强,readable 通过 pipe(管道)传输给 writable,pipe 的实现大致如下(伪代码):

 
   
   
 
  1. Readable.prototype.pipe = function(writable, options) {

  2.  this.on('data', (chunk) => {

  3.    let ok = writable.write(chunk);

  4.    // 背压,暂停

  5.    !ok && this.pause();

  6.  });

  7.  writable.on('drain', () => {

  8.    // 恢复

  9.    this.resume();

  10.  });

  11.  // 告诉 writable 有流要导入

  12.  writable.emit('pipe', this);

  13.  // 支持链式调用

  14.  return writable;

  15. };

上面做了五件事情:

  • emit(pipe),通知写入

  • .write(),新数据过来,写入

  • .pause(),消费者消费速度慢,暂停写入

  • .resume(),消费者完成消费,继续写入

  • returnwritable,支持链式调用

当然,上面只是最简单的逻辑,还有很多异常和临界判断没有加入,具体可以去看看 Node.js 的代码( /lib/streamreadable.js)。

Duplex Stream

Duplex,双工的意思,它的输入和输出可以没有任何关系,

 
   
   
 
  1. const util = require('util');

  2. const Readable = require('_stream_readable');

  3. const Writable = require('_stream_writable');

  4. util.inherits(Duplex, Readable);

  5. var keys = Object.keys(Writable.prototype);

  6. for (var v = 0; v < keys.length; v++) {

  7.  var method = keys[v];

  8.  if (!Duplex.prototype[method])

  9.    Duplex.prototype[method] = Writable.prototype[method];

  10. }

我们可以通过 options 参数来配置它为只可读、只可写或者半工模式,一个简单的 Demo:

 
   
   
 
  1. var Duplex = require('stream').Duplex

  2. const duplex = Duplex();

  3. // readable

  4. let i = 2;

  5. duplex._read = function () {

  6.  this.push(i-- ? 'read ' + i : null);

  7. };

  8. duplex.on('data', data => console.log(data.toString()));

  9. // writable

  10. duplex._write = function (chunk, encoding, callback) {

  11.  console.log(chunk.toString());

  12.  callback();

  13. };

  14. duplex.write('write');

输出的结果为:

 
   
   
 
  1. write

  2. read 1

  3. read 0

可以看出,两个管道是相互之间不干扰的。

Transform Stream

Transform Stream 集成了 Duplex Stream,它同样具备 Readable 和 Writable 的能力,只不过它的输入和输出是存在相互关联的,中间做了一次转换处理。常见的处理有 Gzip 压缩、解压等。

Transform 的处理就是通过 _transform 函数将 Duplex 的 Readable 连接到 Writable,由于 Readable 的生产效率与 Writable 的消费效率是一样的,所以这里 Transform 内部不存在「背压」问题,背压问题的源头是外部的生产者和消费者速度差造成的。

关于 Transfrom Stream,我写了一个简单的 Demo:

 
   
   
 
  1. const Transform = require('stream').Transform;

  2. const MAP = {

  3.  'Barret': '靖',

  4.  'Lee': '李'

  5. };

  6. class Translate extends Transform {

  7.  constructor(dataSource, options) {

  8.    super(options);

  9.  }

  10.  _transform(buf, enc, next) {

  11.    const key = buf.toString();

  12.    const data = MAP[key];

  13.    this.push(data);

  14.    next();

  15.  }

  16. }

  17. var transform = new Translate();

  18. transform.on('data', data => console.log(data.toString()));

  19. transform.write('Lee');

  20. transform.write('Barret');

  21. transform.end();

小结

本文主要参考和查阅 Node.js 官网的文档和源码,细节问题都是从源码中找到的答案,如有理解不准确之处,还请斧正。关于 Stream,这篇文章只是讲述了基础的原理,还有很多细节之处没有讲到,要真正理解它,还是需要多读读文档,写写代码。

了解了这些 Stream 的内部机制,对我们后续深入理解上层代码有很大的促进作用,特别希望初学 Node.js 的同学花点时间进来看看。

题图:https://unsplash.com/photos/pOWBHdgy1Lo By @Neven Krcmarek


以上是关于深入理解 Node.js Stream 内部机制的主要内容,如果未能解决你的问题,请参考以下文章

Node.js:深入浅出 http 与 stream

深入理解Node.js垃圾回收与内存管理

深入理解V8的垃圾回收原理

理解 Node.js 中 Stream(流)

Node.js实战对于Buffer和Stream模块系统的深入剖析

深入理解事件循环机制