具有大量查询的 node-postgres

Posted

技术标签:

【中文标题】具有大量查询的 node-postgres【英文标题】:node-postgres with massive amount of queries 【发布时间】:2015-05-19 23:35:24 【问题描述】:

我刚开始使用 node-postgres 使用 node.js 和 postgres。我尝试做的一件事是编写一个简短的 js 来填充我的数据库,使用一个包含大约 200,000 个条目的文件。

我注意到一段时间后(不到 10 秒),我开始收到“错误:连接终止”。我不确定这是否与我使用 node-postgres 的方式有关,或者是因为我向 postgres 发送垃圾邮件。

无论如何,这里有一个显示这种行为的简单代码:

var pg = require('pg');
var connectionString = "postgres://xxxx:xxxx@localhost/xxxx";

pg.connect(connectionString, function(err,client,done)
  if(err) 
    return console.error('could not connect to postgres', err);
  

  client.query("DROP TABLE IF EXISTS testDB");
  client.query("CREATE TABLE IF NOT EXISTS testDB (id int, first int, second int)");
  done();

  for (i = 0; i < 1000000; i++)
    client.query("INSERT INTO testDB VALUES (" + i.toString() + "," + (1000000-i).toString() + "," + (-i).toString() + ")",   function(err,result)
      if (err) 
         return console.error('Error inserting query', err);
      
      done();
    );
  
);

在大约 18,000-20,000 次查询后失败。这是使用client.query的错误方式吗?我尝试更改默认客户端编号,但似乎没有帮助。

client.connect() 似乎也没有帮助,但那是因为我有太多的客户端,所以我绝对认为客户端池是要走的路。

感谢您的帮助!

【问题讨论】:

【参考方案1】:

更新

此答案已被本文取代:Data Imports,它代表了最新的方法。


为了复制您的场景,我使用了 pg-promise 库,并且我可以确认,无论您使用哪个库,直接尝试都不会奏效,重要的是方法。

下面是一种改进的方法,我们将插入分割成块,然后在事务中执行每个块,这就是负载平衡(又名限制):

function insertRecords(N) 
    return db.tx(function (ctx) 
        var queries = [];
        for (var i = 1; i <= N; i++) 
            queries.push(ctx.none('insert into test(name) values($1)', 'name-' + i));
        
        return promise.all(queries);
    );

function insertAll(idx) 
    if (!idx) 
        idx = 0;
    
    return insertRecords(100000)
        .then(function () 
            if (idx >= 9) 
                return promise.resolve('SUCCESS');
             else 
                return insertAll(++idx);
            
        , function (reason) 
            return promise.reject(reason);
        );

insertAll()
    .then(function (data) 
        console.log(data);
    , function (reason) 
        console.log(reason);
    )
    .done(function () 
        pgp.end();
    );

这在大约 4 分钟内产生了 1000,000 条记录,在前 3 笔交易之后显着放缓。我使用的是 Node JS 0.10.38(64 位),它消耗了大约 340MB 的内存。这样我们插入了 100,000 条记录,连续 10 次。

如果我们这样做,只是这次在 100 个事务中插入 10,000 条记录,同样的 1,000,000 条记录在 1 分 25 秒内添加,没有减速,Node JS 消耗大约 100MB 内存,这告诉我们像这样划分数据是个好主意。

不管你用哪个库,方法都应该是一样的:

    将您的插入分区/限制到多个事务中; 将单个事务中的插入列表保持在大约 10,000 条记录; 在同步链中执行所有交易。 在每个事务的 COMMIT 之后将连接释放回池。

如果您违反任何这些规则,您肯定会遇到麻烦。例如,如果您违反规则 3,您的 Node JS 进程可能会很快耗尽内存并引发错误。我的示例中的规则 4 由库提供。

如果你遵循这个模式,你就不需要为连接池设置而烦恼了。

更新 1

pg-promise的后期版本完美支持此类场景,如下图:

function factory(index) 
    if (index < 1000000) 
        return this.query('insert into test(name) values($1)', 'name-' + index);
    


db.tx(function () 
    return this.batch([
        this.none('drop table if exists test'),
        this.none('create table test(id serial, name text)'),
        this.sequence(factory), // key method
        this.one('select count(*) from test')
    ]);
)
    .then(function (data) 
        console.log("COUNT:", data[3].count);
    )
    .catch(function (error) 
        console.log("ERROR:", error);
    );

如果你不想包含任何额外的东西,比如创建表,那么它看起来更简单:

function factory(index) 
    if (index < 1000000) 
        return this.query('insert into test(name) values($1)', 'name-' + index);
    


db.tx(function () 
    return this.sequence(factory);
)
    .then(function (data) 
        // success;
    )
    .catch(function (error) 
        // error;
    );

详情请见Synchronous Transactions。

使用Bluebird 作为 promise 库,例如,在我的生产机器上插入 1,000,000 条记录需要 1 分 43 秒(未启用长堆栈跟踪)。

您只需根据index 让您的factory 方法返回请求,直到您没有剩余,就这么简单。

最好的部分是,这不仅速度快,而且对您的 NodeJS 进程产生的负载也很小。在整个测试过程中,内存测试过程保持在 60MB 以下,仅消耗 CPU 时间的 7-8%。

更新 2

从 1.7.2 版本开始,pg-promise 轻松支持超大规模事务。见章节Synchronous Transactions。

例如,我可以在家里的 PC 上用 64 位 Windows 8.1 在 15 分钟内插入 10,000,000 条记录。

为了测试,我将我的电脑设置为生产模式,并使用Bluebird 作为承诺库。在测试期间,整个 NodeJS 0.12.5 进程(64 位)的内存消耗没有超过 75MB,而我的 i7-4770 CPU 显示出一致的 15% 负载。

以同样的方式插入 100m 条记录只需要更多的耐心,而不是更多的计算机资源。

与此同时,之前对 1m 刀片的测试从 1m43s 下降到 1m31s。

更新 3

以下注意事项可能会产生巨大的影响:Performance Boost。

更新 4

相关问题,有更好的实现示例: Massive inserts with pg-promise.

更新 5

可以在此处找到更好和更新的示例:nodeJS inserting Data into PostgreSQL error

【讨论】:

谢谢。我同意这个故事的寓意是“不要那样做”。 FWIW 我最终使用了 postgres COPY FROM。 我是否正确假设 this.sequence(factory), this.one('select count(*) from test') 不会给我正确的计数,因为我不能确定在开始计数查询时序列是否完全写入? @stephanlindauer 当序列解析时,它使用对象 total, duration 解析,所以总数是序列处理的元素数。 当然。但是select count(*) from test的结果在那种情况下是不可信的,对吧? @stephanlindauer 在这里查看我的回复:github.com/cstephen/aaep-scripts/issues/1【参考方案2】:

我猜您已达到最大池大小。由于client.query 是异步的,所以所有可用的连接在返回之前都会被使用。

默认池大小为 10。请在此处查看:https://github.com/brianc/node-postgres/blob/master/lib/defaults.js#L27

你可以通过设置pg.defaults.poolSize来增加默认池大小:

pg.defaults.poolSize = 20;

更新:释放连接后执行另一个查询。

var pg = require('pg');
var connectionString = "postgres://xxxx:xxxx@localhost/xxxx";
var MAX_POOL_SIZE = 25;

pg.defaults.poolSize = MAX_POOL_SIZE;
pg.connect(connectionString, function(err,client,done)
  if(err) 
    return console.error('could not connect to postgres', err);
  

  var release = function() 
    done();
    i++;
    if(i < 1000000)
      insertQ();
  ;

  var insertQ = function() 
    client.query("INSERT INTO testDB VALUES (" + i.toString() + "," + (1000000-i).toString() + "," + (-i).toString() + ")",        function(err,result)
      if (err) 
         return console.error('Error inserting query', err);
      
      release();
    );
  ;

  client.query("DROP TABLE IF EXISTS testDB");
  client.query("CREATE TABLE IF NOT EXISTS testDB (id int, first int,    second int)");
  done();

  for (i = 0; i < MAX_POOL_SIZE; i++)
    insertQ();
  
);

基本思想是,由于您将大量连接池大小相对较小的查询排入队列,因此您将达到最大池大小。这里我们只有在现有连接被释放后才进行新的查询。

【讨论】:

谢谢。我确实将其更改为 25,但它对我的原始代码没有帮助。不过这个还没试过。我将其称为“默认客户编号”……哎呀……或者是别的什么? @DanielSutantyo:如何更改“默认客户编号”? 所以,我尝试了 pg.defaults.poolsize = 25; (可能更多)和 pg.defaults.poolIdleTimeout =60000;第一个肯定会增加我可以输入的条目数量,但不会增加太多。 @DanielSutantyo:你能试试我在更新的答案中提到的东西吗?基本思想是仅在释放已使用的连接后进行另一个查询。 谢谢。在下周开始之前,我有点忙不过来,但我会尽快尝试一下。无论如何,很高兴了解队列和释放。我也尝试使用 npm-async 库实现,但遇到了同样的问题。

以上是关于具有大量查询的 node-postgres的主要内容,如果未能解决你的问题,请参考以下文章

查询具有大量列的 Hive 表时,是不是可以减少 MetaStore 检查的数量?

具有大量数据的多个连接的 MySql 查询

具有大量查询的 node-postgres

运行具有大量过滤的 Solr Query 时出现 SocketException

试图找到将 SQL Query 转换为具有大量记录的 Pandas DataFrame 的最有效方法

获取具有大量参数的 RESTful 列表