深入node4 可写流的实现 转化流

Posted lin-fighting

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入node4 可写流的实现 转化流相关的知识,希望对你有一定的参考价值。

可写流的使用


可写流的highWaterMark表示期望这个文件接受多少个值。



end不仅会写入,而且会触发close事件。



一个true,一个false,是因为我们的highWaterMark设置了3,希望只用3个内存来写,但是返回的值与我们是否写入无关,返回false也会写入。

  • 但是有个问题,我们写多个wirte的时候,是并发异步操作,所以不能确定哪个快哪个慢。

  • 可以将并发异步操作变为串行异步。

  • 除了第一次的write,接下来的write排队,第一个完成后,将队列中的每一个write拿出来执行。有点像eventsloop,直到清空队列,但是队列缓存可能会过大,所以需要一个预期,也就是highWaterMark来控制,达到预期后,就不要调用write方法。虽然再调用也会写入进去。

  • 结合fs.createStream。

    当我们第一次写入的时候,当文件吃不下了,也就是到highWaterMark的值了,就应该停止写入了,等文件吃完了,触发了drain方法,这时候再去恢复rs.resume(),此时后面的写入就是往真实的文件中去写了,而不是一直在排队了。读取默认的highWaterMark是64k,而写入的默认highWaterMark是16k。

  • 这跟第一种并发异步写入有很大区别。

可写流的实现

可写流是基于链表实现的,因为可写流涉及队列排序的问题,比较多的使用了头部的增删。而链表在头部和尾部的增删的时候效率比较高。
链表的实现可以观看:单链表,双链表

  • 可读流内部是基于steam和stream的Readable(跟我们实现的MyReadSteream类相似,内部也是调用fs.read)以及events来实现的。fs自己实现了ReadStream继承stream的Readable,并且实现自己的_read方法。
  • 可写流内部是基于stream和Writable以及events来实现的,fs实现的WrtieStream继承了stream的Writable。自己实现的_wirte方法供父类的wirte方法调用。

实现十个数,希望使用三个内存来处理


原生的。

实现自己的WriteStream

思路:跟可读流一样,将open和wirte操作分离,通过事件发布的模式。然后通过变量判断当前write是否是第一次调用,保证每次只有一个write在执行,其他的全部扔进缓存,当write执行完毕之后,再从缓存中一个一个拿出来执行write。直到缓存清空完毕,触发drain事件,通知用户缓存清完了,变量置为初始化。
这样就能使并发异步操作变成串行异步操作。
初始化变量,这里跟ReadStream有点区别,比如没有end,encoding默认写入是utf8等等。

len用来判断当前缓存的值的长度,needDrain用来判断缓存是否过多是否达到期望值。cahce是缓存队列。writing用来标识是否第一次写入。

  • open方法
  • 执行write方法

    因为数据可能有中文,有英文,所以第一步默认转为buffer。然后判断当前write写入的buffer的长度+this.len(缓存队列的长度)与期望值highWaterMark相比较,是否超过期望值。
    再重写cb函数,这样每次写入成功调用cb的时候,就会执行clearBuffer方法。
    然后通过writing判断是不是第一次写入,第一次写入就调用_write方法,真正的执行fs.write,否则就放入缓存之中。
  • _write方法

    write方法可能用户执行的时候open还没打开,因为是异步,所以需要做处理,然后执行write方法,写入内容,每次写完之后都要维护偏移量,并且减少len。执行cb()。cb是重写过的,会调用clearBuffer方法。
  • clearBuffer方法

    这个方法主要将缓存中的write任务一个一个拿出并执行,注意这里调用的是_write方法,是要真正写入文件的。然后清空缓存之后,需要出发drain事件,告诉用户已经清空,当前可以继续write。并且重置一些变量。


    触发四次drain事件,成功写入四个。

总结:

通过缓存以及全局变量,和事件系统,只有第一次write才会真正执行fs.write,其他的write方法会放入缓存,只有当fs.write执行完毕之后,才会继续从缓存取出进行fs.wrtie。直到缓存为空,再触发drain事件,重置变量,让用户可以继续调用write方法写入。就好比人吃饭,喂一口大的,它总要先吃一点小的,剩下部分在口腔里嚼,并且返回false告诉你,先别喂,等全部吃完。当它慢慢嚼完并吞下当前口腔有的事物之后,再告诉你,可以继续喂了,依次类推,反复循环,直到这碗饭吃饭(全部写入)

优化 使用链表代替数组cache


链表的实现:在单链表这里。
实验:


成功删除了第一个3,链表此时是6=>10。



改造完成,
期望是3,所以第一次会调用三次write,两个write存入queue中,如

首先queue中有两个,然后取出49之后,有剩一个50,再取出来之后就为空。完成。
全部代码:

// WriteStream
const LinkedList = require("./linklist");

//基于链表实现队列
class Queue 
  constructor() 
    this.link = new LinkedList();
  
  //在最后一个中加入
  offer(element) 
    this.link.append(element);
  
  //移除链表第一位并返回
  shift() 
    return this.link.removeAt(0);
  


class WriteStream extends EventMitter 
  constructor(path, options) 
    super();
    this.path = path;
    this.flags = options.flags || "w";
    this.encoding = options.encoding || "utf8";
    this.autoClose = options.autoClose || true;
    this.mode = options.mode || 0o666;
    this.start = options.start || 0;
    this.highWaterMark = options.highWaterMark || 16 * 1024;
    this.emitClose = options.emitClose || true;
    this.offset = this.start; // 每次写入文件的位移数

    this.fd = undefined;

    this.len = 0; //判断的缓存
    this.needDrain = false; //是否需要触发drain
    this.cache = []; //第二次开始的写入缓存
    this.writing = false; //标识是否正在写入
    this.queue = new Queue();
    this.open();
  

  open() 
    fs.open(this.path, this.flags, this.mode, (err, fd) => 
      this.fd = fd;
      this.emit("open", fd);
    );
  

  _write(chunk, encoding, cb) 
    if (typeof this.fd !== "number") 
      this.once("open", () => 
        this._write(chunk, encoding, cb);
      );
      return;
    
    fs.write(this.fd, chunk, 0, chunk.length, this.offset, (err, written) => 
      this.offset += written; //维护偏移量
      this.len -= written; //把缓存的个数减少
      cb(); //回调
    );
  

  //依次将缓存队列等待任务拿出来一个个写入。
  clearBuffer() 
    const a = JSON.stringify(this.queue);
    console.log(a);
    const data = this.queue.shift();
    if (data) 
      this._write(data.chunk, data.encoding, data.cb);
     else 
      //缓存读取完了,需要触发drain事件
      this.writing = false; //当前写入已经完成了。
      if (this.needDrain) 
        //如果存入过多导致期望值过大
        this.needDrain = false;
        this.emit("drain");
      
    
  


//模拟Stream的Writable的实例上的write方法
EventMitter.prototype.write = function (
  chunk,
  encoding = this.encoding,
  cb = () => 
) 
  // 异步方法
  // 注意写入的中文跟英文比较。 将数据全部转换为Buffer
  chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
  this.len += chunk.length;

  //判断到达期望了吗
  const returnValue = this.len < this.highWaterMark ? true : false;

  //当数据写入后,需要触发drain并且将Len减减
  this.needDrain = !returnValue;

  //  清空缓存队列的逻辑
  const userCb = cb;
  cb = () => 
    userCb();
    this.clearBuffer();
  ;

  //判断是否第一次给入的数据,因为后面的write都是直接放入缓存中
  if (!this.writing) 
    //第一次写入,真正执行写入操作
    this._write(chunk, encoding, cb);
    this.writing = true;
   else 
    //第二次写入了,存入缓存期
    this.queue.offer(
      chunk,
      encoding,
      cb,
    );
  
  return returnValue;
;

const ws = new WriteStream("b.txt", 
  flags: "w",
  encoding: null,
  autoClose: true,
  start: 0, //没有end属性,只有start
  highWaterMark: 3, //与可读流不一样,可写流的highWaterMark表示期望这个文件只接受3个内存
);

ws.on("open", () => 
  console.log("文件打开");
);

let i = 0;
function write() 
  let flag = true;
  while (i < 4 && flag) 
    flag = ws.write(i++ + "");
    console.log('flag', flag);
  


ws.on("drain", () => 
  //只有当吸入的数据达到了预期,并且数据已经被写入文件之后才会触发drain事件。
  console.log("写完了");
  write();
);

write();

// 链表
class Node 
  constructor(data) 
    this.data = data;
    this.next = null;
  

module.exports = class LinkedList 
  constructor() 
    //头指针
    this.head = null;
    //链表的长度
    this.length = 0;
  

  append(data) 
    const element = new Node(data);
    if (!this.head) 
      this.head = element;
     else 
      let current = this.head;

      //遍历找到最后的节点
      while (current.next) 
        current = current.next;
      
      //插入
      current.next = element;
    
    this.length++;
  

  // //特定位置插入
  insert(position, data) 
    if (
      typeof position !== "number" ||
      position < 0 ||
      position > this.length
    ) 
      return false;
    
    const element = new Node(data);
    //插入首位
    if (position === 0) 
      element.next = this.head;
      this.head = element;
     else 
      //插入中间的一位
      let current = this.head;
      let index = 1;
      //采用插入位置前一位进行操作,比如插入到第四个,就将element.next指向第三个的下一个,再将第三个的下一个重新向element
      while (index++ < position) 
        // 如position = 4, index = 4的时候, current指向第三个,因为执行了两遍
        current = current.next;
      
      // 让element变成第四个
      element.next = current.next;
      current.next = element;

      // for (let i = 1; i < position; i++) 
      //     if (i === position - 1) 
      //         element.next = current.next
      //         current.next = element
      //         break;
      //     
      //     current = current.next
      // 
    
    this.length++;
  

  // //获取对应位置的元素
  get(position) 
    if (
      typeof position !== "number" ||
      position < 0 ||
      position >= this.length
    ) 
      return undefined;
    
    let current = this.head;
    let index = 0;
    while (index++ < position) 
      current = current.next;
    
    return current.data;
  

  // //返回元素在列表中的索引
  indexOf(data) 
    let current = this.head;
    let index = 0;
    while (current) 
      if (current.data === data) 
        return index;
      
      current = current.next;
      index++;
    
    return -1;
  

  // //修改某个位置的元素
  update(position, data) 
    if (
      typeof position !== "number" ||
      position < 0 ||
      position >= this.length
    ) 
      return false;
    
    let current = this.head;
    let index = 0;
    while (index++ < position) 
      current = current.next;
    
    current.data = data;
  

  // //从列表的特定位置移除一项
  removeAt(position) 
    if (
      typeof position !== "number" ||
      position < 0 ||
      position >= this.length
    ) 
      return undefined;
    
    let current = this.head;
    if (position === 0) 
      const headElData = this.head.data;
      this.head = this.head.next;
      this.length--;
      return headElData;
    
    let index = 0;
    // 找到应该删除的节点的前一个,比如删除2,此时的current 为1所指向的节点。
    while (index++ < position - 1) 
      current = current.next;
    
    const currentElement = current.next;
    let element = current.next.next; //保存第三个节点
    current.next.next = null; //让第二个节点与第三个节点断掉关系
    current.next = element; // 第一个节点指向第三个节点
    this.length--;
    return currentElement.data;
  

  // 从列表移除一项
  remove(element) 
    let current = this.head;
    let nextElement;
    let isSuccess;

    //如果第一个就是
    if (current.data === element) 
      nextElement = this.head.next;
      this.head = element;
      element.next = nextElement;
      return true;
    

    // 找到应该删除的节点的前一个,比如删除2,此时的current 为1所指向的节点。
    while (current) 
      if (current.next.data === element) 
        nextElement = current.next.next; //存粗下下个节点
        current.next.next = null; //断开下个节点与下下个节点的联系
        current.next = nextElement; //连接下下个节点
        isSuccess = true;
        break;
       else 
        current = current.next;
      
    

    //是否删除成功
    if (isSuccess) 
      this.length--;
      return true;
    

    return false;
  

  isEmpty() 
    return !!this.length;
  

  size() 
    return this.length;
  

  toString() 
    let current = this.head;
    let str = "";
    while (current) 
      str += `,$current.data.toString()`;
      current = current.next;
    
    return str.slice(1);
  
;

pipe的实现


将rs的监听和ws的监听内聚到一起,可以读取一点写入一点,监听可读流的’data’事件,将获取到的数据写入可写流中,如果返回false,则暂停读取,写入完毕后触发drain事件。继续读取,直到最终完毕。
pipe的中文是管道,也是数据从可读流 流向了可写流。

流的分类

可读流,可写流,双工流,转化流。

双工流


实现读写的功能,读写无关联。

转化流

读写有关联。如实现输入小写字母转为大写字母,


可以通过pipe。

输入后经过process.stdout输出,中间可以继续
加一个转化。如
输入后转为特定符号。
特定符号输出。
输入输出有关联。
流的玩法都相似,都只需要继承Stream的然后实现相对应的方法就可以。


输入abc留给transfrom转化,再留给process.stdout输出。

以上是关于深入node4 可写流的实现 转化流的主要内容,如果未能解决你的问题,请参考以下文章

将数据管道传输到尚未准备好接收数据的可写流

深入理解并运用Node中的IO模型流

nodejs的流总结

Node.js 可写流创建错误文件(更大且不可读)

Node.js——Stream

Node.js 可写流:write vs _write