深入理解并运用Node中的IO模型流

Posted Failarmy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解并运用Node中的IO模型流相关的知识,希望对你有一定的参考价值。

在 NodeJs 中,流随处可见,读/写文件流,HTTP请求/返回流,stdin/stdout流。理解并运用好流会让你的Node更具力量。

Stream

  • lib/_stream_readable.js

  • lib/_stream_writable.js

  • lib/_stream_tranform.js

  • lib/_stream_duplex.js

流主要有可读 Readable,可写流 Writable,双工可读可写流 Duplex, Transform 流就是继承 Duplex 的。 通过pipe管道,可读流可以pipe到一个或多个可写流。 看源码能发现里面涉及了一堆状态控制的代码,什么时候读,什么时候写,什么时候暂停读。 大部分情况下程序面对的问题。通常都可以抽象成一个输入/输出的问题,中间可能会包含转换。

实际问题怎么运用流呢。

读取大文件

小文件可以一次性读到内存,但如果一个 10G 的文件呢?ReadLine 模块很好用但是你知道背后怎么实现的吗。 试试怎么用 stream.Transform 来自己实现一个readLine

 
   
   
 
  1. const stream = require("stream");

  2. const fs = require("fs");


  3. function ReadLineStream(FILE_PATH) {

  4. var transformer = new stream.Transform({ objectMode: true });

  5. // 实现Transform主要的_transform方法

  6. transformer._transform = function(chunk, encoding, done) {

  7. // buffer 需要toString

  8. var data = chunk.toString();


  9. // 累加读取出来的数据

  10. if (this._lastLineData) data = this._lastLineData + data;


  11. // 按照换行符分割

  12. var lines = data.split("\n");


  13. // 取出数组末尾剩余不是一行的内容和下一次拼接

  14. this._lastLineData = lines.splice(lines.length - 1, 1)[0];


  15. // 通过this.push 对外输出剩余累计的行

  16. lines.forEach(this.push.bind(this));

  17. done();

  18. };


  19. transformer._flush = function(done) {

  20. // 末尾处理

  21. if (this._lastLineData) this.push(this._lastLineData);

  22. this._lastLineData = null;

  23. done();

  24. };


  25. // 创建文件读取流

  26. var source = fs.createReadStream(FILE_PATH);

  27. source.pipe(transformer);

  28. return transformer;

  29. }


  30. var fileReader = ReadLineStream("xx.log");

  31. // 创建一个可写流

  32. var writable = Stream.Writable({

  33. objectMode: true,

  34. write: function(line, _, next) {

  35. async () => {

  36. // 逐行写入到mysql

  37. var parsed = JSON.parse(line);

  38. await mysql.insert("insert into log set ?", parsed);

  39. process.nextTick(next);

  40. };

  41. return true;

  42. }

  43. });


  44. fileReader.pipe(writable);

通过实现一个消费/可写流我们就可以来对大文件进行处理,比如说实现一个 word count 计数器, 从文件导入到数据库。

而不用担心需要一次性读取整个文件到内存里out of memory这种问题。

通过继承 strean 模块我们也不需要过多的去考虑什么时候该读,什么时候因为写的压力大(背压),该停止读,让整个读写流有序的运行。 你只需要专注于实现你自己的 write read transform Spark, Strom 的实时计算流也是这样的,大任务分解成小任务,只需要专注于自己业务逻辑的 map,reduce

单机爬虫

再举个栗子, 它的输入可能是一堆 URL、输出是结构化的数据。需要写入到关系型数据库。

  • 可以把 URL 数据获取抽象成一个可读流,

  • 爬取过程,数据提取抽象成一个 transform 流

  • 写入数据库抽象成一个可写流,

只需要约定好每个过程输出的数据模型,就可以在每个过程实现各种目的不一样的流。

  • 如数据源,可以是读取文件,MYQL,分布式列队

  • 抓取转换流,可以是普通的 HTTP爬虫,Puppeteer 可渲染性爬虫

  • 数据存储流,可以写文件,MYQL,或者HDFS

read > transform > write

这样程序看起来是不是特别简洁?

URL 读取流

 
   
   
 
  1. const Stream = require('stream');

  2. const fs = require('fs');

  3. const Sequelize = require('sequelize');

  4. const Op = Sequelize.Op


  5. const sequelize = new Sequelize('database', 'username', 'password', {

  6. host: 'localhost',

  7. dialect: 'mysql',

  8. pool: {

  9. max: 5,

  10. min: 0,

  11. acquire: 30000,

  12. idle: 10000

  13. },

  14. });


  15. // mysql表模型

  16. const Url = sequelize.define('urls', {

  17. id: {

  18. type: Sequelize.INTEGER,

  19. primaryKey: true,

  20. autoIncrement: true,

  21. },

  22. url: Sequelize.STRING,

  23. });



  24. class SpiderMysqlSourceReadStream extends Stream.Readable {


  25. constructor(opts){

  26. super();

  27. this.opts = opts || {};

  28. this.filePath = opts.file || '';

  29. this.fileStream = null;

  30. this.cursor = 0;

  31. this.connection = null;

  32. Stream.Readable.call(this, {

  33. objectMode: true,

  34. highWaterMark: opts.highWaterMark || 1000

  35. });

  36. }


  37. // 覆盖并实现read方法

  38. _read(){


  39. // 查大于当前id的10条

  40. var urls = await Model.findAll({

  41. attributes: ['url', 'id'],

  42. where: {

  43. id: {

  44. [Op.gt]: this.cursor

  45. }

  46. },

  47. limit: 10

  48. });


  49. urls.forEach((row) => {

  50. this.cursor = row.id;

  51. this.push(row);

  52. });


  53. }

  54. }

把 URL 的变成结构化数据的转换流

 
   
   
 
  1. const cheerio = require('cheerio')


  2. class SpiderFetch extends Stream.Transform {

  3. _tranform(row, encoding, done){

  4. var url = row.url;

  5. (async () => {

  6. var parsed = null;

  7. try{

  8. var contentResp = await fetch(url);

  9. var contexthtml = await contentResp.text();

  10. parsed = this.parse(contextHtml);

  11. }catch(e){

  12. console.log('fetch error', e)

  13. }


  14. this.push(parsed);

  15. done();

  16. })();

  17. }


  18. parse(contextHtml){

  19. var $ = cheerio.load(contextHtml);

  20. ....

  21. ....

  22. }

  23. }

mysql 入库写入流

 
   
   
 
  1. // mysql表模型

  2. const Document = sequelize.define('document', {

  3. id: {

  4. type: Sequelize.INTEGER,

  5. primaryKey: true,

  6. autoIncrement: true,

  7. },

  8. title: Sequelize.STRING,

  9. });


  10. const MysqlWriteStream extends Stream.Writable {


  11. _write(chunk, encoding, done){

  12. var data = JSON.parse(chunk.toString());

  13. (async () => {

  14. await Document.create(data);

  15. done();

  16. })();

  17. }


  18. }


  19. var readStream = new SpiderMysqlSourceReadStream();

  20. var fetchStream = new SpiderFetch();

  21. var writeStream = new MysqlWriteStream();


  22. readStream.pipe(fetchStream).pipe(writeStream);

是不是简洁明了? 在Node中异步流随处可见,谁让它基因就是这样呢。

参考:

  • https://nodejs.org/api/stream.html

  • https://www.barretlee.com/blog/2017/06/06/dive-to-nodejs-at-stream-module/

  • https://nodejs.org/en/docs/guides/backpressuring-in-streams/


以上是关于深入理解并运用Node中的IO模型流的主要内容,如果未能解决你的问题,请参考以下文章

深入理解node.js异步编程

深入理解node.js异步编程:基础篇

JAVA IO流深入理解

深入理解 Node.js Stream 内部机制

《深入理解计算机系统》Tiny服务器4——epoll类型IO复用版Tiny

根据IO流源码深入理解装饰设计模式使用