如何将 JSON 数组从 NodeJS 流式传输到 postgres

Posted

技术标签:

【中文标题】如何将 JSON 数组从 NodeJS 流式传输到 postgres【英文标题】:How can I stream a JSON Array from NodeJS to postgres 【发布时间】:2016-04-13 17:54:42 【问题描述】:

我正在尝试插入几百万条记录(大约有 6 个字段/列),每次批量插入尝试(使用 sequelize.js 和 bulkCreate())接收来自客户端的请求 10,000 条记录

这显然是个坏主意,所以我试着调查node-pg-copy-streams

但是,我不想在客户端发起更改,在客户端发送一个 json 数组

# python
data = [
    
     "column a":"a values",
     "column b":"b values",
    ,
    ...
    # 10,000 items
    ...
]
request.post(data=json.dumps(data), url=url)

在 nodejs 的服务器端,我将如何在以下骨架中流式传输接收到的request.body

.post(function(req, res)

    // old sequelize code
    /* table5.bulkCreate(
        req.body, raw:true
    ).then(function()
        return table5.findAll();
    ).then(function(result)
        res.json(result.count);
    );*/

    // new pg-copy-streams code
    pg.connect(function(err, client, done) 
    var stream = client.query(copyFrom('COPY my_table FROM STDIN'));
    // My question is here, how would I stream or pipe the request body ?
    // ?.on('error', done);
    // ?.pipe(stream).on('finish', done).on('error', done);
    );
);

【问题讨论】:

请告诉我如何改进这个问题,这可能是我的第一个问题或永远的第一个问题 我目前正在研究如何流式传输字符串或数组。 postgres 复制定义了三种允许输入的格式:文本、csv 和二进制。我不是专家,但格式描述如下:postgresql.org/docs/9.3/static/sql-copy.html(搜索“文件格式”标题) @thst 谢谢,这很有帮助。我可能理解错了,但是无论是文本、csv 还是二进制文件,都不需要流式传输吗?我实际上会查看文本部分,但这仍然需要流式传输? 【参考方案1】:

从技术上讲,这里没有流式传输,就 NodeJS 流式传输的工作方式而言。

您每次发送一个包含 10,000 条记录的块,并希望您的服务器端插入这些记录并向客户端返回 OK 以发送另外 10,000 条记录。那是节流/分页数据,而不是流式传输。

一旦您的服务器收到接下来的 10,000 条记录,将它们插入(通常作为事务),然后向客户端返回 OK 响应,以便它可以发送接下来的 10,000 条记录。

使用 node-postgres 编写事务并不是一件容易的事,因为它的级别太低了。

下面是如何在pg-promise 的帮助下做到这一点的示例:

function insertRecords(records) 
    return db.tx(t=> 
        var inserts = [];
        records.forEach(r=> 
            var query = t.none("INSERT INTO table(fieldA, ...) VALUES($propA, ...)", r);
            inserts.push(query);
        );
        return t.batch(inserts);
    );

然后在您的 HTTP 处理程序中,您将编写:

function myPostHandler(req, res)         
    // var records = get records from the request;    
    insertRecords(records)
        .then(data=> 
            // set response as success;
        )
        .catch(error=> 
            // set response as error;
        );    

【讨论】:

谢谢,这有帮助,但从整体上看仍然有点低效。我将此与 COPY FROM 的速度进行了比较,它的速度较慢,所以我将使用它而不是 INSERT 如果你想有效地做到这一点,你应该通过 TCP-IP 使用 web IO 将数据直接从客户端流式传输到数据库。 是的,那肯定更有效率!但是,更改客户端的代码并不可行,并且在我们当前的设置中(客户端和数据库位于不同的网络上),引入对数据库的访问也是不切实际的。更改服务器代码是我们抵抗力最小的途径。 我不知道你做了什么样的速度测试,但我建议一次从客户端输入 1000 条记录,然后看看它与流式传输相比的表现如何。我的猜测 - 会差不多。 10,000 太多了,您很可能会超载服务器端。 我们基本上是在客户端运行'time python ...'来比较结果,实际上我们每次上传也运行了30,000-500,000条记录/行。我们每个数据集的总大小约为一百万行,我们有 40-50 个数据集【参考方案2】:

这是我解决问题的方法,

首先将我的 req.body dict 转换为 TSV 的函数(不是初始问题的一部分)

/**
 * Converts a dictionary and set of keys to a Tab Separated Value blob of text
 * @param Dictionary object dict
 * @param Array of Keys keys
 * @return Concatenated Tab Separated Values String
 */
function convertDictsToTSV(dicts, keys)
    // ...

其次是我原来的 .post 函数的其余部分

.post(function(req, res)
    // ...
    /* requires 'stream' as 
     * var stream = require('stream');
     * var copyFrom = require('pg-copy-streams').from;
     */
    var read_stream_string = new stream.Readable();
    read_stream_string.read = function noop() ;
    var keys = [...]; // set of dictionary keys to extract from req.body 
    read_stream_string.push(convertDictsToTSV(req.body, keys));
    read_stream_string.push(null);
    pg.connect(connectionString, function(err, client, done) 
        // ...
        // error handling
        // ...
        var copy_string = 'Copy tablename (' + keys.join(',') + ') FROM STDIN'
        var pg_copy_stream = client.query( copyFrom( copy_string ) );
        read_stream_string.pipe(pg_copy_stream).on('finish', function(finished)
            // handle finished and done appropriately
        ).on('error', function(errored)
            // handle errored and done appropriately
        );
    );
    pg.end();
);

【讨论】:

以上是关于如何将 JSON 数组从 NodeJS 流式传输到 postgres的主要内容,如果未能解决你的问题,请参考以下文章

如何从文件中流式传输 JSON?

我可以使用 nodejs 将麦克风音频从客户端流式传输到客户端吗?

NodeJS如何以最小的毫秒延迟流式传输屏幕截图?

如何将音频数据从 Android 流式传输到 WebSocket 服务器?

将 JSON 流式传输到 Bigquery

使用 nodejs 将视频流式传输到浏览器