如何将 JSON 数组从 NodeJS 流式传输到 postgres
Posted
技术标签:
【中文标题】如何将 JSON 数组从 NodeJS 流式传输到 postgres【英文标题】:How can I stream a JSON Array from NodeJS to postgres 【发布时间】:2016-04-13 17:54:42 【问题描述】:我正在尝试插入几百万条记录(大约有 6 个字段/列),每次批量插入尝试(使用 sequelize
.js 和 bulkCreate()
)接收来自客户端的请求 10,000 条记录
这显然是个坏主意,所以我试着调查node-pg-copy-streams
但是,我不想在客户端发起更改,在客户端发送一个 json 数组
# python
data = [
"column a":"a values",
"column b":"b values",
,
...
# 10,000 items
...
]
request.post(data=json.dumps(data), url=url)
在 nodejs 的服务器端,我将如何在以下骨架中流式传输接收到的request.body
?
.post(function(req, res)
// old sequelize code
/* table5.bulkCreate(
req.body, raw:true
).then(function()
return table5.findAll();
).then(function(result)
res.json(result.count);
);*/
// new pg-copy-streams code
pg.connect(function(err, client, done)
var stream = client.query(copyFrom('COPY my_table FROM STDIN'));
// My question is here, how would I stream or pipe the request body ?
// ?.on('error', done);
// ?.pipe(stream).on('finish', done).on('error', done);
);
);
【问题讨论】:
请告诉我如何改进这个问题,这可能是我的第一个问题或永远的第一个问题 我目前正在研究如何流式传输字符串或数组。 postgres 复制定义了三种允许输入的格式:文本、csv 和二进制。我不是专家,但格式描述如下:postgresql.org/docs/9.3/static/sql-copy.html(搜索“文件格式”标题) @thst 谢谢,这很有帮助。我可能理解错了,但是无论是文本、csv 还是二进制文件,都不需要流式传输吗?我实际上会查看文本部分,但这仍然需要流式传输? 【参考方案1】:从技术上讲,这里没有流式传输,就 NodeJS 流式传输的工作方式而言。
您每次发送一个包含 10,000 条记录的块,并希望您的服务器端插入这些记录并向客户端返回 OK 以发送另外 10,000 条记录。那是节流/分页数据,而不是流式传输。
一旦您的服务器收到接下来的 10,000 条记录,将它们插入(通常作为事务),然后向客户端返回 OK 响应,以便它可以发送接下来的 10,000 条记录。
使用 node-postgres 编写事务并不是一件容易的事,因为它的级别太低了。
下面是如何在pg-promise 的帮助下做到这一点的示例:
function insertRecords(records)
return db.tx(t=>
var inserts = [];
records.forEach(r=>
var query = t.none("INSERT INTO table(fieldA, ...) VALUES($propA, ...)", r);
inserts.push(query);
);
return t.batch(inserts);
);
然后在您的 HTTP 处理程序中,您将编写:
function myPostHandler(req, res)
// var records = get records from the request;
insertRecords(records)
.then(data=>
// set response as success;
)
.catch(error=>
// set response as error;
);
【讨论】:
谢谢,这有帮助,但从整体上看仍然有点低效。我将此与 COPY FROM 的速度进行了比较,它的速度较慢,所以我将使用它而不是 INSERT 如果你想有效地做到这一点,你应该通过 TCP-IP 使用 web IO 将数据直接从客户端流式传输到数据库。 是的,那肯定更有效率!但是,更改客户端的代码并不可行,并且在我们当前的设置中(客户端和数据库位于不同的网络上),引入对数据库的访问也是不切实际的。更改服务器代码是我们抵抗力最小的途径。 我不知道你做了什么样的速度测试,但我建议一次从客户端输入 1000 条记录,然后看看它与流式传输相比的表现如何。我的猜测 - 会差不多。 10,000 太多了,您很可能会超载服务器端。 我们基本上是在客户端运行'time python ...'来比较结果,实际上我们每次上传也运行了30,000-500,000条记录/行。我们每个数据集的总大小约为一百万行,我们有 40-50 个数据集【参考方案2】:这是我解决问题的方法,
首先将我的 req.body dict 转换为 TSV 的函数(不是初始问题的一部分)
/**
* Converts a dictionary and set of keys to a Tab Separated Value blob of text
* @param Dictionary object dict
* @param Array of Keys keys
* @return Concatenated Tab Separated Values String
*/
function convertDictsToTSV(dicts, keys)
// ...
其次是我原来的 .post 函数的其余部分
.post(function(req, res)
// ...
/* requires 'stream' as
* var stream = require('stream');
* var copyFrom = require('pg-copy-streams').from;
*/
var read_stream_string = new stream.Readable();
read_stream_string.read = function noop() ;
var keys = [...]; // set of dictionary keys to extract from req.body
read_stream_string.push(convertDictsToTSV(req.body, keys));
read_stream_string.push(null);
pg.connect(connectionString, function(err, client, done)
// ...
// error handling
// ...
var copy_string = 'Copy tablename (' + keys.join(',') + ') FROM STDIN'
var pg_copy_stream = client.query( copyFrom( copy_string ) );
read_stream_string.pipe(pg_copy_stream).on('finish', function(finished)
// handle finished and done appropriately
).on('error', function(errored)
// handle errored and done appropriately
);
);
pg.end();
);
【讨论】:
以上是关于如何将 JSON 数组从 NodeJS 流式传输到 postgres的主要内容,如果未能解决你的问题,请参考以下文章
我可以使用 nodejs 将麦克风音频从客户端流式传输到客户端吗?