Node.js 流如何工作?

Posted

技术标签:

【中文标题】Node.js 流如何工作?【英文标题】:How do Node.js Streams work? 【发布时间】:2015-04-05 23:08:31 【问题描述】:

我有一个关于 Node.js 流的问题 - 特别是它们在概念上是如何工作的。

不乏有关如何使用流的文档。但是我很难找到流在数据级别的工作方式。

我对 Web 通信 HTTP 的有限理解是完整的数据“包”来回发送。与个人订购公司目录类似,客户端向服务器发送 GET(目录)请求,服务器以目录响应。浏览器接收的不是目录的一页,而是整本书。

节点流可能是多部分消息吗?

我喜欢 REST 模型——尤其是它是无状态的。浏览器和服务器之间的每一次交互都是完全独立且足够的。因此节点流不是 RESTful 的吗?一位开发人员提到了与保持连接打开的套接字管道的相似之处。回到我的目录订购示例,这会不会像一个带有“但是等等!还有更多!”的电视广告。而不是完整的目录?

流的很大一部分是接收者“下游”向上游发送“暂停”和“继续”等消息的能力。这些消息由什么组成?他们是 POST 吗?

最后,我对 Node 工作原理的有限视觉理解包括这个事件循环。函数可以放置在与线程池不同的线程上,并且事件循环继续进行。但是不应该发送数据流保持事件循环被占用(即停止)直到流完成?它如何同时监视来自下游的“暂停”请求?n 事件循环是否将流放在池中的另一个线程上,当它遇到“暂停”请求时,检索相关线程并暂停它?

我已经阅读了 node.js 文档,完成了 nodeschool 教程,构建了一个 heroku 应用程序,购买了两本书(真实的、独立的、书籍,有点像之前所说的目录,可能不像 node 流),问了几个代码训练营的“节点”讲师 - 都在谈论如何使用流,但没有人谈论下面实际发生的事情。

也许您发现了一个很好的资源来解释这些是如何工作的?对于非 CS 思维来说,这或许是一个很好的拟人化类比?

【问题讨论】:

顺便说一句,这是一个非常好的问题。众所周知,这是理解node 最难的部分。我现在做节点有一段时间了,今年似乎才明白。这也使得维护非常困难,因为只有少数人了解它的核心部分。我认为他们主要代表流工作组。拥抱界面确实改变了您的编程方式,您将利用大部分节点功能。我贴了一张图,下面真的很难用谷歌搜索。 【参考方案1】:

首先要注意的是:node.js 流不限于 HTTP 请求。 HTTP 请求/网络资源只是 node.js 中流的一个示例。

流对于可以小块处理的所有内容都很有用。它们使您可以更轻松地以较小的块处理潜在的巨大资源,从而更轻松地放入 RAM。

假设您有一个文件(数 GB 大小)并且想要将所有小写字母转换为大写字符并将结果写入另一个文件。天真的方法会使用fs.readFile 读取整个文件(为简洁起见,省略了错误处理):

fs.readFile('my_huge_file', function (err, data) 
    var convertedData = data.toString().toUpperCase();

    fs.writeFile('my_converted_file', convertedData);
);

不幸的是,这种方法很容易使您的 RAM 不堪重负,因为在处理之前必须存储整个文件。您还会浪费宝贵的时间等待文件被读取。以较小的块处理文件是否有意义?在等待硬盘提供剩余数据时,您可以在获得第一个字节后立即开始处理:

var readStream = fs.createReadStream('my_huge_file');
var writeStream = fs.createWriteStream('my_converted_file');
readStream.on('data', function (chunk) 
    var convertedChunk = chunk.toString().toUpperCase();
    writeStream.write(convertedChunk);
);
readStream.on('end', function () 
    writeStream.end();
);

这种方法要好得多:

    您将只处理能够轻松放入 RAM 的小部分数据。 您在第一个字节到达后开始处理,不要浪费时间什么都不做,而是等待。

一旦你打开流 node.js 将打开文件并开始读取它。一旦操作系统将一些字节传递给正在读取文件的线程,它将被传递给您的应用程序。


回到 HTTP 流:

    第一个问题在这里也有效。攻击者可能会向您发送大量数据以压倒您的 RAM 并关闭 (DoS) 您的服务。 但是,在这种情况下,第二个问题更为重要: 网络可能非常慢(想想智能手机),并且客户端发送所有内容可能需要很长时间。通过使用流,您可以开始处理请求并缩短响应时间。

关于暂停 HTTP 流:这不是在 HTTP 级别完成的,而是在更低级别完成。如果你暂停流 node.js 将简单地停止从底层 TCP 套接字读取。 然后发生的事情取决于内核。它可能仍会缓冲传入的数据,因此一旦您完成当前工作,它就可以为您准备好。 It may also inform the sender at the TCP level that it should pause sending data。应用程序不需要处理这个问题。那不关他们的事。事实上,发送者应用程序可能甚至没有意识到您不再主动阅读!

因此,这基本上是在数据可用时立即提供数据,但不会占用您的资源。底层的工作要么由操作系统完成(例如netfshttp),要么由您正在使用的流的作者完成(例如zlib,它是一个Transform 流并且通常是螺栓连接的转到fsnet)。

【讨论】:

> 我认为这也意味着节点不违反 RESTful 模型,该模型是对 HTTP 级别而不是底层 TCP 进行建模,对吗? - 是的。 HTTP 消息由几个较小的 TCP “块”(数据包)组成。 HTTP 流正在处理这些 TCP 数据包。 > 从这里开始,我必须转向学习套接字以更好地理解节点流,对吗? - 不必要。但是,基本了解文件读取或网络资源被消耗时会发生什么将是有帮助的。您可能想查找在 C 中读取文件/网络资源的示例,以更好地了解正在发生的事情。 @ZAR > 但是在我们通过 HTTP 管道传输这个数 GB 的大文件的情况下。虽然 HTTP 响应很快,但 tcp 套接字必须保持打开相当长的一段时间。这会暂停节点事件循环还是节点继续? – 事件循环始终在运行并等待数据处理。只有一个 single 事件循环来处理您的所有事件(流所基于的)。一旦操作系统将一些数据传递给 node.js,读取器线程基本上会在读取流上调用 readable 事件。 @Zar readable 事件的调用可能会发生数次(可能是数千次)。取决于您使用的流和其他因素。当您已经消费了您已经收到通知的所有内容时,每次数据可用时都会调用它。 @TimWolla 感谢您的回答,但我认为它只回答了部分问题。假设我正在使用 express.js 响应流来传输一个大文件。它在 HTTP 级别上看起来如何?文件是被分割成块并作为多部分发送,还是要传输编码:分块或其他【参考方案2】:

下面的图表似乎是节点流类的非常准确的 10.000 英尺概述/图表。

它代表streams3,由Chris Dickinson提供。

【讨论】:

找到here,这也是他们过去工作方式的一个很好的介绍。 谢谢@elje.......我还发现了这个资源,它可以让你修改缓冲区,看看当你达到高水位线时会发生什么,等等。thlorenz.com/stream-viz/?nums=100&powers=100&tarpit=100 它提供了一个很好的使用您提供的模型的方式。谢谢! @ZAR 确实不错。谢谢。 非常漂亮的图表。这仍然是当前的事情,还是他们同时改变了什么?【参考方案3】:

首先,什么是流? 好吧,通过流,我们可以处理意义读取和写入数据片段,而无需完成整个读取或写入操作。因此,我们不必将所有数据都保存在内存中来执行这些操作。

例如,当我们使用流读取文件时,我们会读取部分数据,对其进行处理,然后释放内存,并重复此操作,直到处理完整个文件。或者想想 YouTube 或 Netflix,它们都称为流媒体公司,因为它们使用相同的原理流式传输视频。

因此,无需等到整个视频文件加载完毕,而是逐个或分块完成处理,这样您甚至可以在整个文件下载之前就开始观看。所以这里的原则不仅仅是关于 Node.JS。但普遍适用于计算机科学。

如您所见,这使得流成为处理大量数据(例如,视频或我们从外部源逐段接收的数据)的理想选择。此外,流式处理使数据处理在内存方面更加高效,因为不需要将所有数据保存在内存中,而且在时间方面,因为我们可以在数据到达时开始处理,而不是等到一切都来了。

它们是如何在 Node.JS 中实现的:

所以在 Node 中,有四种基本类型的流: 可读流、可写流、双工流和转换流。但是可读和可写是最重要的,可读流是我们可以读取和消费数据的流。流在 Node 核心模块中无处不在,例如,http 服务器收到请求时传入的数据实际上是可读流。因此,与请求一起发送的所有数据都是一块一块地进来的,而不是一大块。另外,文件系统的另一个例子是,我们可以使用 FS 模块的读取屏幕逐段读取文件,这实际上对于大型文本文件非常有用。

嗯,另一个需要注意的重要事情是流实际上是 EventEmitter 类的实例。这意味着所有流都可以发出和侦听命名事件。在可读流的情况下,它们可以发出,我们可以监听许多不同的事件。但最重要的两个是数据结束事件当有新的数据要消费时触发 data 事件当没有更多数据要消费时触发 end 事件。当然,我们可以相应地对这些事件做出反应。

最后,除了事件之外,我们还有可以在流上使用的重要函数。在可读流的情况下,最重要的是 piperead 函数。超级重要的管道功能,它基本上允许我们将流连接在一起,将数据从一个流传递到另一个流,而完全不必担心事件。

接下来,可写流是我们可以写入数据的流。所以基本上,与可读流相反。一个很好的例子是我们可以发送回客户端的 http 响应,它实际上是一个可写流。所以一个我们可以写入数据的流。所以当我们想发送数据时,我们必须把它写在某个地方,对吧?那个地方是一个可写的流,这很有意义,对吧?

例如,如果我们想向客户发送一个大视频文件,我们会像 Netflix 或 YouTube 那样做。现在关于事件,最重要的是排水和完成事件。其中最重要的函数是 write 和 end 函数。

关于双工流。它们只是同时可读和可写的流。这些不太常见。但无论如何,一个很好的例子是来自 net 模块的网络套接字。 Web 套接字基本上只是客户端和服务器之间的通信通道,它可以双向工作,并且在建立连接后保持打开状态。

最后,转换流是双工流,因此流既可读又可写,同时可以在读取或写入数据时修改或转换数据。一个很好的例子是 zlib 核心模块来压缩实际使用转换流的数据。

*** Node 将这些 http 请求和响应实现为流,然后我们可以使用它们,我们可以使用每种类型的流可用的事件和函数来使用它们。我们当然也可以实现自己的流,然后使用这些相同的事件和函数来使用它们。

现在让我们尝试一些例子:

const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) =>
    fs.readFile('./txt/long_file.txt', (err, data)=>
        if(err) console.log(err);
        res.end(data);
    );
);
server.listen('8000','127.0.01', ()=>
    console.log(this);
);

假设 long_file.txt 文件包含 1000000K 行,每行包含超过 100 个单词,所以这是一个包含大量数据的拥抱文件,现在在上面的示例中,问题是使用 readFile() 函数节点会将整个文件加载到内存中,因为只有将整个文件加载到内存节点后,才能将数据作为响应对象传输。

当文件很大时,并且当有大量请求访问您的服务器时,通过时间节点进程将很快耗尽资源并且您的应用程序将停止工作,一切都会崩溃。

让我们尝试使用流来寻找解决方案:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) =>
    const readable = fs.createReadStream('./txt/long_file.txt');
    readable.on('data', chunk=>
        res.write(chunk);
    );
    readable.on('end',()=>
        res.end();
    )
    readable.on('error', err=>
        console.log('err');
        res.statusCode=500;
        res.end('File not found');
    );
);

server.listen('8000','127.0.01', ()=>
    console.log(this);
);

在上面的流示例中,我们有效地流式传输文件,我们正在读取文件的一部分,一旦可用,我们就使用响应的 write 方法将其直接发送到客户端溪流。然后当下一个图片可用时,该片段将被发送,一直到整个文件被读取并流式传输到客户端。

所以流基本上完成了从文件中读取数据,结束事件将发出信号,表示不再有数据写入该可写流。

通过上面的练习,我们解决了之前的问题,但是,上面的例子还有一个很大的问题,叫做背压。

问题在于,我们用于从磁盘读取文件的可读流比通过网络通过响应可写流实际发送结果要快得多。这将使响应流不堪重负,响应流无法如此快速地处理所有这些传入数据,这个问题称为背压。

解决方案是使用管道算子,它会处理数据进来的速度和数据出去的速度。

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) =>
    const readable = fs.createReadStream('./txt/long_file.txt');
    readable.pipe(res);

);

server.listen('8000','127.0.01', ()=>
    console.log(this);
);

【讨论】:

感谢您抽出宝贵的时间来写一个深思熟虑的答案。【参考方案4】:

我认为你想多了这一切是如何运作的,我喜欢它。

流有什么好处

流有两个好处:

当操作很慢并且它可以在获得结果时为您提供部分结果。例如读取一个文件,它很慢,因为硬盘驱动器很慢,它可以在读取文件时为您提供部分文件。通过流,您可以使用文件的这些部分并立即开始处理它们。

它们也很适合将程序连接在一起(读取函数)。就像在命令行中一样,您可以将不同的程序连接在一起以产生所需的输出。示例:cat file | grep word

它们是如何在后台工作的......

这些操作中的大多数需要时间来处理并且可以在获得时给你部分结果,它们不是由 Node.js 完成的,它们是由 V8 JS 引擎完成的,它只会将这些结果交给 JS 供你使用他们。

要了解您的 http 示例,您需要了解 http 的工作原理

网页可以使用不同的编码发送。一开始只有一种方法。请求时发送整个页面的位置。现在它有更有效的编码来做到这一点。其中一个是分块的,其中部分网页被发送,直到整个页面被发送。这很好,因为可以在收到网页时对其进行处理。想象一个网络浏览器。它可以在下载完成之前开始渲染网站。

您的 .pause 和 .continue 问题

首先,Node.js 流只能在同一个 Node.js 程序中工作。 Node.js 流无法与其他服务器甚至程序中的流交互。

这意味着在下面的示例中,Node.js 无法与网络服务器通信。它不能告诉它暂停或恢复。

Node.js <-> Network <-> Webserver

真正发生的是 Node.js 请求一个网页并开始下载它并且无法停止该下载。只是丢掉了套接字。

那么,当您在 Node.js 中创建 .pause 或 .continue 时,究竟会发生什么?

它开始缓冲请求,直到您准备好再次开始使用它。但下载从未停止。

事件循环

我已经准备好完整的答案来解释事件循环的工作原理,但我认为watch this talk 对你来说更好。

【讨论】:

这太棒了!关于暂停和继续,您会说“真正发生的是 Node.js 请求一个网页并开始下载它,并且无法停止下载。只是丢弃套接字。”那么当前未处理(但已下载)的数据会发生什么情况?它是否存储在本地某个地方?这不会从节省 RAM 的角度出发吗? 是的。它发生了。在编写 http 规范时,它没有声明停止请求的方法,因此如果您开始下载网页,则无法暂停它。如果http有办法告诉他停止发送网页,那是可能的,但事实并非如此。在其他情况下,就像文件系统一样,它有办法告诉他停止阅读,直到你准备好继续阅读文件。 So what's happening to the data that isn't being currently processed (but downloaded)? Is it stored somewhere locally? 是的,在缓冲区中,该缓冲区可能会溢出。 没错,发送应用程序处理该请求的速度更快,因为它可以在从数据库中获取数据后立即发送数据。此外,不要将多部分消息与分块请求混淆。多部分消息意味着它由不同的部分组成,例如,一个图像和两个表单字段。就像您发送表格一样。而一个分块编码的消息意味着http body会分部分发送。 啊,这是一个重要的区别。好的,我要回到我的 http 基础知识了!加斯顿,感谢您的时间和回答!

以上是关于Node.js 流如何工作?的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Node.js 中的 API 请求创建响应流?

如何在 node.js 中读取整个文本流?

Node.js - 如何将流转换为字符串

如何使用 node.js 顺序读取 csv 文件(使用流 API)

如何使用 Node.js 创建带有缓冲区的读取流

如何在 Node.js 流回调中聚合从异步函数生成的 Promise?