javascript Node.js流揭秘

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了javascript Node.js流揭秘相关的知识,希望对你有一定的参考价值。

A quick overview of the node.js streams interface with basic examples.

This is based on [@brycebaril's](https://github.com/brycebaril) presentation, [Node.js Streams2 Demystified](http://brycebaril.github.io/streams2-presentation)


## Overview

Streams are a first-class construct in Node.js for handling data. 

Think of them as as **lazy evaluation applied to data**.

There are essentially three major concepts:

* **source** - where the data comes from
* **pipeline** - where you filter or transform your data as it passes through
* **sink** - where your data ultimately goes

Benefits in using streams:

* **lazily** produce or consume data in buffered chunks
* **evented** and **non-blocking**
* low memory footprint
* automatically handle **back-pressure**
* buffers allow you to work around the v8 heap memory limit
* most core node.js content sources/sinks are streams already!

Five classes of streams:

* `Readable` - sources
* `Writable` - sinks
* `Duplex` - both source and sink
* `Transform` - in-flight stream operations
* `Passthrough` - stream spy

Below is a quick overview of **Readable**, **Writable**, and **Transform** streams.

See also:

* [API](http://nodejs.org/docs/latest/api/stream.html)
* [Handbook](https://github.com/substack/stream-handbook)
* [Workshop](https://github.com/joyrexus/nodeschool/tree/master/stream-adventure#stream-adventure)
* [5 min guide](http://dailyjs.com/2013/04/01/streams-streams-streams/)


---


# Readable

Use a **Readable** stream when supplying data as a stream.

Think: spigot/faucet.


## How to implement

1. Subclass [stream.Readable](http://nodejs.org/api/stream.html#stream_class_stream_readable).

2. Implement a `_read(size)` method.


## Methods

### `_read(size)`

* `size` is in bytes, but can be ignored (especially for objectMode streams)
* `_read(size)` must call this.push(chunk) to send a chunk to the consumer


## Options

* `highWaterMark` number: maximum number of bytes to store in the internal
buffer before ceasing to read (default: 16kb)

* `encoding` string: if set, buffers will be decoded to strings instead of
passing buffers (default: `null`)

* `objectmode` boolean: instead of using buffers/strings, use javascript objects
(default: `false`)


## How to use

* `readable.pipe(target)`
* `readable.read(size)`
* `readable.on("data", ... )`


## See also

* [stream-spigot](http://npm.im/stream-spigot) - creates readable streams from Arrays or simple functions


---


# Writable

Use a **Writable** stream when collecting data from a stream. 

Think: drain/collect.


## How to implement

1. Subclass [stream.Writable](http://nodejs.org/api/stream.html#stream_class_stream_writable).

2. Implement a `_write(chunk, encoding, callback)` method.


## Methods

### `_write(chunk, encoding, callback)` 

* `chunk` is the content to write
* Call `callback()` when you're done with this chunk


## Options

* `highWaterMark` number: maximum number of bytes to store in the internal
buffer before ceasing to read (default: 16kb)

* `decodeStrings` boolean: whether to decode strings to Buffers before passing
them to `_write()` (default: true)


## How to use

* `source.pipe(sink)`
* `writable.write(chunk [,encoding] [,callback])`


## See also

* [concat-stream](http://npm.im/concat-stream) - writable stream that
  concatenates strings or binary data and calls a callback with the result


---


# Transform

Use a **Transform** stream when you want to operate on a stream in transit. This is a special kind of Duplex stream where the input and output stream are the same stream. 

Think: filter/map.


## How to implement

1. Subclass [stream.Transform](http://nodejs.org/api/stream.html#stream_class_stream_transform).
2. Implement a `_transform(chunk, encoding, callback)` method.
3. Optionally implement a `_flush(callback)` method.


## Methods

### `_transform(chunk, encoding, callback)`

Call `this.push(something)` to forward it to the next consumer.
You don't have to push anything, this will skip a chunk.
You *must* call `callback` one time per `_transform` call.

### `_flush(callback)`

When the stream ends, this is your chance to do any cleanup or last-minute `this.push()` calls to clear any buffers or work. Call `callback()` when done.


## Options

Superset of Readable and Writable options.


## How to use

* `source.pipe(transform).pipe(drain)`
* `transform.on("data", ... )`


## See also

* [through2](http://npm.im/through2) - makes it easy to generate Transforms without all the subclassing boilerplate
* [through2-map](https://github.com/brycebaril/through2-map) - 
  Array.prototype.map analog for streams
* [through2-filter](https://github.com/brycebaril/through2-filter) - 
  Array.prototype.filter analog for streams
* [through2-reduce](https://github.com/brycebaril/through2-reduce) - 
  Array.prototype.reduce analog for streams
* [stream reducer demo](http://bl.ocks.org/joyrexus/9074340) - showing how
  to extend a Transform stream to create reducers/accumulators for streamed objects
* [sculpt](https://github.com/Medium/sculpt) - a collection of transform stream
  utilities (all operating in `objectMode`)
* [pipe-iterators](https://github.com/mixu/pipe-iterators) - another collection
  of functions for iterating over object mode streams
Readable = require("stream").Readable

class Source extends Readable
  
  constructor: (@content, options) ->
    super options

  _read: (size) ->
    if not @content
      @push null 
    else
      @push(@content.slice(0, size))
      @content = @content.slice(size)


s = new Source("The quick brown fox jumps over the lazy dog.")
console.log(chunk.toString()) while chunk = s.read(10)  # print in 10 byte chunks

# The quick 
# brown fox 
# jumps over
#  the lazy 
# dog.


s = new Source("How now brown cow?")
s.pipe(process.stdout)

# How now brown cow?
var Readable = require("stream").Readable
var inherits = require("util").inherits

function Source(content, options) {
  Readable.call(this, options)
  this.content = content
}

inherits(Source, Readable)

Source.prototype._read = function (size) {
  if (!this.content) this.push(null)
  else {
    this.push(this.content.slice(0, size))
    this.content = this.content.slice(size)
  }
}

var s = new Source("The quick brown fox jumps over the lazy dog.")
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())

// The quick 
// brown fox 
// jumps over
//  the lazy 
// dog.


var q = new Source("How now brown cow?")
q.pipe(process.stdout)

// How now brown cow?
{Readable, Transform} = require("stream")

class ToUpper extends Transform

  _transform: (data, enc, next) ->
    @push data.toString().toUpperCase()
    next()


# a simple transform stream
tx = new ToUpper

# a simple source stream
rs = new Readable
rs.push 'the quick brown fox jumps over the lazy dog!\n'
rs.push null 

rs.pipe(tx)
  .pipe(process.stdout)   # THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG!

var Transform = require("stream").Transform
var inherits = require("util").inherits

function ToUpper (options) {
  Transform.call(this, options)
}

inherits(ToUpper, Transform)

ToUpper.prototype._transform = function (chunk, encoding, callback) {
  var str = chunk.toString().toUpperCase()
  this.push(str)
  callback()
}

// a simple transform stream
var tx = new ToUpper;

// a simple source stream
var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('the quick brown fox ');
rs.push('jumps over the lazy dog.\n');
rs.push(null);

rs.pipe(tx).pipe(process.stdout);

// THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.
{Readable, Writable} = require('stream')

class Sink extends Writable

  _write: (data, enc, next) ->
    console.log(data.toString())
    next()


# a simple source stream
source = new Readable
source.push 'the quick brown fox '
source.push 'jumps over the lazy dog.\n'
source.push null

sink = new Sink
source.pipe(sink)

# the quick brown fox 
# jumps over the lazy dog.


###

Same example as above except that the source stream passes strings (instead of buffers) and the sink stream doesn't decode the strings to buffers before writing.

###
class Sink extends Writable

  constructor: -> 
    super(decodeStrings: false)           # don't decode strings

  _write: (data, enc, next) ->
    console.log(data)
    next()


# a simple source stream
source = new Readable(encoding: 'utf8')   # buffers will be decoded to strings
source.push 'the quick brown fox '
source.push 'jumps over the lazy dog.\n'
source.push null

sink = new Sink
source.pipe(sink)

# the quick brown fox 
# jumps over the lazy dog.

var Writable = require("stream").Writable
var inherits = require("util").inherits

function Sink(options) {
  Writable.call(this, options)
}

inherits(Sink, Writable)

Sink.prototype._write = function (chunk, encoding, callback) {
  console.log(chunk.toString())
  callback()
}


// a simple source stream
var Readable = require('stream').Readable;
var source = new Readable;
source.push('the quick brown fox ');
source.push('jumps over the lazy dog.\n');
source.push(null);

var sink = new Sink;
source.pipe(sink);

以上是关于javascript Node.js流揭秘的主要内容,如果未能解决你的问题,请参考以下文章

javascript Node.js模式:异步控制流

Node系列:揭秘 Node.js 底层架构

揭秘:如何考察前端的 Node.js 及工程能力

Node.js 中的管道/流式处理 JavaScript 对象

Node.js 可写流创建错误文件(更大且不可读)

带有 node.js 的对话流中的 Bigquery ML