Node.js 中的面向行的流

Posted

技术标签:

【中文标题】Node.js 中的面向行的流【英文标题】:Line-oriented streams in Node.js 【发布时间】:2011-09-28 06:17:35 【问题描述】:

我正在使用 Node.js 开发一个多进程应用程序。在此应用程序中,父进程将生成一个子进程并使用基于 JSON 的消息传递协议通过管道与其通信。我发现大型 JSON 消息可能会被“切断”,因此向管道上的数据侦听器发出的单个“块”不包含完整的 JSON 消息。此外,小的 JSON 消息可以分组在同一个块中。每条 JSON 消息都将由换行符分隔,所以我想知道是否已经有一个实用程序可以缓冲管道读取流,以便一次发出一行(因此,对于我的应用程序,一个 JSON 文档一次)。这似乎是一个非常常见的用例,所以我想知道它是否已经完成了。

如果有人能提供任何指导,我将不胜感激。谢谢。

【问题讨论】:

您是否考虑过只使用良好的旧 HTTP?为什么要发明新的 IPC 协议?通过 HTTP 发送 JSON 消息是一个已解决的问题,node 非常擅长 HTTP。 您能描述一下您将如何使用 HTTP 来实现吗?现在,我不知道这会如何改变问题的性质,因为我相信您仍然会从流中读取块。 您将子进程编码为常规 node.js HTTP 服务器。接受和发送 JSON 消息的能力由 express.js/connect.js bodyParser 中间件提供。 github.com/senchalabs/connect/blob/master/lib/middleware/…。这以标准的 node.js 事件驱动方式处理块。无需重新发明这种机制。 还可以查看 axon github.com/visionmedia/axon。将每个socket的编码设置为JSON,就可以轻松收发JSON对象了 【参考方案1】:

也许佩德罗的carrier 可以帮助你?

Carrier 帮您实现换行 通过 node.js 终止协议。

客户可以向您发送大块的 线路和承运人只会通知您 在每个完成的行上。

【讨论】:

【参考方案2】:

我对这个问题的解决方案是发送 JSON 消息,每个消息都以一些特殊的 unicode 字符结尾。 JSON 字符串中通常不会出现的字符。称之为 TERM。

所以发件人只需执行“JSON.stringify(message) + TERM;”并写下来。 然后接收者在 TERM 上拆分传入数据并使用 JSON.parse() 解析这些部分,这非常快。 诀窍是最后一条消息可能无法解析,因此我们只需保存该片段并在下一条消息到来时将其添加到下一条消息的开头。接收代码是这样的:

        s.on("data", function (data) 
        var info = data.toString().split(TERM);
        info[0] = fragment + info[0];
        fragment = '';

        for ( var index = 0; index < info.length; index++) 
            if (info[index]) 
                try 
                    var message = JSON.parse(info[index]);
                    self.emit('message', message);
                 catch (error) 
                    fragment = info[index];
                    continue;
                
            
        
    );

“片段”被定义在它将在数据块之间持续存在的地方。

但什么是 TERM?我使用了 unicode 替换字符 '\uFFFD'。也可以使用 twitter 使用的技术,其中消息由 '\r\n' 分隔,并且推文使用 '\n' 换行并且从不包含 '\r\n'

我发现这比包含长度之类的东西要简单得多。

【讨论】:

【参考方案3】:

最简单的解决方案是在每条消息之前以固定长度前缀(4 个字节?)发送 json 数据的长度,并使用一个简单的非框架解析器来缓冲小块或拆分大块。

您可以尝试node-binary 来避免手动编写解析器。查看 scan(key, buffer) 文档示例 - 它完全可以逐行读取。

【讨论】:

【参考方案4】:

只要换行符(或您使用的任何分隔符)仅分隔 JSON 消息而不嵌入其中,您就可以使用以下模式:

let buf = ''
s.on('data', data => 
  buf += data.toString()
  const idx = buf.indexOf('\n')
  if (idx < 0)  return  // No '\n', no full message
  let lines = buf.split('\n')
  buf = lines.pop() // if ends in '\n' then buf will be empty
  for (let line of lines) 
    // Handle the line
  
)

【讨论】:

您需要将 const buf = '' 更改为 let buf = '' 才能使其正常工作;不能分配给常量变量。 @user1974458 是的。固定。

以上是关于Node.js 中的面向行的流的主要内容,如果未能解决你的问题,请参考以下文章

不写入创建的流可写 node.js

如何在 node.js 中搜索字符串的流?

Node.js 中的管道/流式处理 JavaScript 对象

Node篇

node.js中的面向对象

理解 Node.js 中 Stream(流)