通过 es.map() 和 JSONStream.stringify() 将 JSONStream.parsed() 数据传输到文件流时,节点堆耗尽

Posted

技术标签:

【中文标题】通过 es.map() 和 JSONStream.stringify() 将 JSONStream.parsed() 数据传输到文件流时,节点堆耗尽【英文标题】:node heap exhausted when piping JSONStream.parsed() data through es.map() and JSONStream.stringify() to file stream 【发布时间】:2016-12-23 12:40:02 【问题描述】:

我正在尝试通过 JSONStream.parse() 管道输入流(从一个巨大的 GeoJSON 文件创建)以将流分解为对象,然后通过 event-stream.map() 允许我转换对象,然后通过 JSONStream.stringify() 从中创建一个字符串,最后生成一个可写的输出流。随着进程的运行,我可以看到节点的内存占用继续增长,直到最终耗尽堆。这是重现问题的最简单的脚本 (test.js):

const fs = require("fs")
const es = require("event-stream")
const js = require("JSONStream")

out = fs.createWriteStream("/dev/null")
process.stdin
    .pipe(js.parse("features.*"))
    .pipe(es.map( function(data, cb)  
        cb(null, data);
        return;
     ))
    .pipe(js.stringify("\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n"))
    .pipe(out)

一个小 bash 脚本 (barf.sh) 将源源不断的 JSON 流喷入节点的 process.stdin 将导致节点的堆逐渐增长:

#!/bin/bash

echo '"type":"FeatureCollection","features":['
while :
do
    echo '"type":"Feature","properties":"name":"A Street", "geometry":"type":"LineString" ,'
done

这样运行:

barf.sh | node test.js

有几种奇怪的方法可以回避这个问题:

删除 fs.createWriteStream() 并将最后一个管道阶段从“.pipe(out)”更改为“.pipe(process.stdout)”,然后将节点的标准输出管道更改为 /dev/null 将异步的es.map()改为同步的es.mapSync()

前面两个操作中的任何一个都将允许脚本永远运行,节点的内存占用低且不变。我在运行 Ubuntu 16.04 的 8GB RAM 的八核机器上使用节点 v6.3.1、事件流 v3.3.4 和 JSONStream 1.1.4。

我希望有人可以帮助我纠正我确信是我的明显错误。

【问题讨论】:

【参考方案1】:

JSONStream 不是 streams2 流,因此不支持背压控制。 (有一个关于streams2here的简要总结。)

这意味着数据将来自data 事件中的parse 流,并且无论消费流是否已为它们准备好,该流都将继续将它们抽出。如果管道中某处的读取和写入速度之间存在差异,则会出现缓冲 - 这就是您所看到的。

您的barf.sh 线束可以看到通过stdin 注入的功能。相反,如果您正在读取大量文件,则应该能够通过暂停文件的读取流来管理流。所以如果你在你的map回调中插入一些pause/resume逻辑,你应该能够让它处理一个大文件;只是需要更长的时间。我会尝试这样的事情:

let in = fs.createReadStream("/some/massive/file");
let out = fs.createWriteStream("/dev/null");
in
    .pipe(js.parse("features.*"))
    .pipe(es.map(function(data, cb) 
        // This is just an example; a 10-millisecond wait per feature would be very slow.
        if (!in.isPaused()) 
            in.pause();
            global.setTimeout(function ()  in.resume(); , 10);
        
        cb(null, data);
        return;
    ))
    .pipe(js.stringify("\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n"))
    .pipe(out);

顺便说一句,使用mapSync 在我的计算机上几乎没有区别(它又旧又慢)。但是,除非你有一些异步操作要在 map 中执行,否则我会选择 mapSync

【讨论】:

谢谢。坦率地说,我仍然很困惑为什么将目标流更改为 process.stdout 允许代码无限期地运行而没有内存增长。你能重现这种行为吗? 对我来说,当管道传输到stdout 时,它似乎仍然会消耗内存,但速度要低得多。我猜fs 基础设施一定比stdout 慢——即使是/dev/null。这是所有推送流的问题;你可以使用 RxJS 遇到类似的情况。我认为你最好的选择是做一些暂停阅读的实验。也许每 1,000 个左右的功能会有一个小小的停顿?

以上是关于通过 es.map() 和 JSONStream.stringify() 将 JSONStream.parsed() 数据传输到文件流时,节点堆耗尽的主要内容,如果未能解决你的问题,请参考以下文章

使用 JSONPath 和 JSONStream 解析 json 流

使用 JSONStream 读取大型 JSON 文件

node.js - JSONStream 期间的无限循环

如何使用 JSONStream 对大对象进行字符串化

Nodejs:JSONStream解析方法正则表达式

@aws-sdk/lib-storage 使用 JSONStream.stringify() 将 JSON 从 MongoDB 流式传输到 S3