在 Node 中正确批处理嵌套的 Promise

Posted

技术标签:

【中文标题】在 Node 中正确批处理嵌套的 Promise【英文标题】:Properly batch nested promises in Node 【发布时间】:2020-05-15 11:56:43 【问题描述】:

我在 Node 中运行 knex seed,由于服务器的限制,我需要对我的数据库进行批量查询。我开始掌握 Promise 和 async/await 的窍门,但是我很难让它在几个层次上工作(在这一点上让我特别失望的是,它似乎干扰了批处理我无法理解的方式)。我的seed 文件如下所示:

exports.seed = async function(knex) 
  const fs = require('fs');
  const _ = require('lodash');

  function get_event_id(location) 
    return knex('events')
      .where(location: location)
      .first()
      .then(result =>  return result['id']; )
      .finally(() =>  knex.destroy() )
  

  function createImage(row, event_id) 
    return 
      name: row[4],
      event_id: event_id
    
  ;

  async function run_query(line) 
      let row = line.split(',');
      let event_id = await get_event_id(row[0]);
      return createImage(row, event_id);
  ;

  async function run_batch(batch) 

      return Promise.all(batch.map(run_query));
   

  const file = fs.readFileSync('./data.csv');
  const lines = file.toString().replace(/[\r]/g, '').split('\n').slice(1,60); // skip csv header, then run first 59 lines

  const batches = _.chunk(lines, 30); // set batch size

  let images = await Promise.all(batches.map(run_batch));

  console.log(_.flatten(images).length);

;

我的数据库一次可以处理 30 个查询。如果我在定义lines 的行上使用.slice(1,30) 运行单个批处理,一切都会正确解决。但是如上所述以 60 运行给了我ER_TOO_MANY_USER_CONNECTIONS: User already has more than 'max_user_connections' active connections

如果我将run_batch 的内容更改为return batch.map(run_query),脚本就会完成,并且它会返回正确的条目数(因此它似乎正在正确地进行批处理)。但是,Promises 仍然悬而未决。我错过了什么,有没有更优雅的方法来做到这一点?

【问题讨论】:

【参考方案1】:

在这一行:

let images = await Promise.all(batches.map(run_batch));

您正在尝试并行运行所有批次,这完全破坏了您的分块。

您可以使用带有await 的常规for 循环而不是.map(),这样您就可以运行批处理,等待它完成,然后运行下一个批处理。

let allResults = [];
for (let batch of batches) 
     let images = await run_batch(batch);
     allResults.push(...images);

console.log(allResults);

仅供参考,您可能会受益于人们编写的任意数量的函数,这些函数用于处理同时运行的请求不超过 N 个的大型数组。这些不需要您手动将数据分成批次。相反,它们会同时监控有多少请求在进行中,并启动您所需数量的请求,当一个请求完成时,它们会启动另一个请求,为您收集结果。

runN(fn, limit, cnt, options):Loop through an API on multiple requests

pMap(array, fn, limit):Make several requests to an api that can only handle 20 at a time

rateLimitMap(array, requestsPerSec, maxInFlight, fn):Proper async method for max requests per second

mapConcurrent(array, maxConcurrent, fn):Promise.all() consumes all my ram

Bluebird promise library 和 Async-promises library 还内置了执行此操作的功能。

【讨论】:

谢谢,这很有帮助(我现在只是想了解基本模式,所以我想尽可能避免使用额外的库)。您的回答是有道理的,并且似乎取得了一些进展,但是现在当我运行多个批次时,我得到了Knex: Timeout acquiring a connection. The pool is probably full. Are you missing a .transacting(trx) call?。我原以为knex.destroy() 会解决这个问题...有没有办法可以在每批之后重置池? @user6647072 - 我对 knex 不是很熟悉,但我读过的一点点听起来像 .destroy() 正在扼杀你的连接池,这可能不是你想要的。如果你不能轻易解决这个问题,那么也许可以开始一个新的问题,重点是问题的 knex 部分。 @user6647072 - 见Where to destroy Knex connection。看起来你只是想删除它。它正在破坏连接池,您必须在调用.destroy() 后创建一个新池。我认为您可以安全地从这部分代码中删除它。查询完成后,连接会自动返回连接池。您可以控制初始化 knex 实例的连接池大小。

以上是关于在 Node 中正确批处理嵌套的 Promise的主要内容,如果未能解决你的问题,请参考以下文章

node传统读取文件和promise,async await,

javascript中的嵌套MongoDB查询

在 Node.js + Express 中使用 Promise 处理错误

node.js的Promise库-bluebird示例

在node.js中使用promise处理MySQL返回值

node.js的Promise库-bluebird示例