ForEach 循环中的异步/等待节点 Postgres 查询

Posted

技术标签:

【中文标题】ForEach 循环中的异步/等待节点 Postgres 查询【英文标题】:Async/Await Node-Postgres Queries Within ForEach Loops 【发布时间】:2017-12-24 23:28:53 【问题描述】:

编辑:我正在使用节点 v8.0.0

我刚开始学习如何使用 node-postgres 访问 SQL 数据库,但在访问多个数据库以以可行的格式收集数据时遇到了一些麻烦,尤其是在 forEach 循环中执行多个查询时。经过几次尝试,我正在尝试 async/await,但出现以下错误:

await client.connect()
  ^^^^^^
SyntaxError: Unexpected identifier

当我尝试使用池或按顺序调用 .query 时,我会得到类似于

的内容
1
[]
could not connect to postgres Error: Connection terminated

这是我的代码的缩写版本:

const  Client  = require('pg');
const moment = require('moment');
const _ = require('lodash');
const turf = require('@turf/turf');

const connString = // connection string
var collected = []
const CID = 300
const snaptimes = // array of times
var counter=0;
const client = new Client(connString);

function createArray(i,j) 
     // return array of i arrays of length j


await client.connect()

snaptimes.forEach(function(snaptime)
  var info = ; // an object of objects
  // get information at given snaptime from database 1
  const query1 = // parametrized query selecting two columns from database 1
  const result1 = await client.query(query1, [CID,snaptime]);
  var x = result1.rows;
  for (var i = 0; i < x.length; i++) 
     // store data from database 1 into info
     // each row is an object with two fields
  

  // line up subjects on the hole
  const query2 = // parametrized query grabbing JSON string from database 2
  const result2 = await client.query(query2, [CID,snaptime]);
  const raw = result2.rows[0].JSON_col;
  const line = createArray(19,0); // an array of 19 empty arrays
  for (var i = 0; i < raw.length; i++) 
     // parse JSON object and record data into line 
  

  // begin to collect data
  var n = 0;
  var g = 0;
  // walk down the line
  for (var i = 18; i > 0; i--) 
    // if no subjects are found at spot i, do nothing, except maybe update g
    if ((line[i] === undefined || line[i].length == 0) && g == 0)
      g = i;
     else if (line[i] !== undefined && line[i].length != 0) 
      // collect data for each subject if subjects are found
      line[i].forEach(function(subject)
        const query 3 = // parametrized query grabbing data for each subject 
        const result3 = await client.query(query3,[CID,subject,snaptime]);
        x = result3.rows;
        const y = moment(x[0].end_time).diff(moment(snaptime),'minutes');
        var yhat = 0;
        // the summation over info depends on g
        if (g===0)
          for (var j = i; j <= 18; j++)
            yhat = moment.duration(info[j].field1).add(yhat,'m').asMinutes();
          
         else 
          for (var j = i; j <= 18; j++)
            if (i<j && j<g+1) 
              yhat = moment.duration(info[j].field2).add(yhat,'m').asMinutes();
             else 
              yhat = moment.duration(info[j].field1).add(yhat,'m').asMinutes();
            
          
        
        collected.push([y,yhat,n,i]);
      );
    
    n+=line[i].length;
    g=0;
  
  // really rough work-around I once used for printing results after a forEach of queries
  counter++;
  if (counter===snaptimes.length)
    console.log(counter);
    console.log(collected);
    client.end(); 
  
);

【问题讨论】:

【参考方案1】:

问题是因为你的 forEach 回调不是async:

snaptimes.forEach(function(snaptime)

应该是:

snaptimes.forEach(async function (snaptime) 

await 完全可识别。

请记住,async 函数会立即返回并返回一个承诺,该承诺最终会被async 函数的return 语句解决(或因async 函数内引发的未捕获异常而被拒绝)。

还要确保你的 Node 版本支持async/await:

从 Node 7.6 开始,它可以在没有 --harmony 标志的情况下使用。 在 7.6 之前的 Node 7.x 中,您必须使用 --harmony 标志。 在 Node 7.0 之前的版本中不可用。

见:http://node.green/#ES2017-features-async-functions

另请注意,您可以在使用async 关键字声明的函数内使用await。如果你想在脚本或模块的顶层使用它,那么你需要将它包装在一个立即调用的函数表达式中:

// cannot use await here
(async () => 
  // can use await here
)();
// cannot use await here

例子:

const f = () => new Promise(r => setTimeout(() => r('x'), 500));

let x = await f();
console.log(x);

打印:

$ node t1.js 
/home/rsp/node/test/prom-async/t1.js:3
let x = await f();
              ^
SyntaxError: Unexpected identifier

但是这个:

const f = () => new Promise(r => setTimeout(() => r('x'), 500));

(async () => 
  let x = await f();
  console.log(x);
)();

打印:

$ node t2.js 
x

0.5 秒延迟后,如预期的那样。

在不支持 async/await 的 Node 版本上,将打印第一个(不正确的)示例:

$ ~/opt/node-v6.7.0/bin/node t1.js 
/home/rsp/node/test/prom-async/t1.js:3
let x = await f();
              ^
SyntaxError: Unexpected identifier

第二个(正确的)示例将打印一个不同的错误:

$ ~/opt/node-v6.7.0/bin/node t2.js 
/home/rsp/node/test/prom-async/t2.js:3
(async () => 
       ^
SyntaxError: Unexpected token (

知道这一点很有用,因为不支持 async/await 的 Node 版本不会给你一个有意义的错误,比如“async/await not supported”或类似的错误,很遗憾。

【讨论】:

在任何调用 await 的函数之前放置 async 可以很好地解决问题。但是,当我尝试向调用第一个 forEach 的 snaptime 数组添加更多 snaptime 时,我得到Error: Connection terminated unexpectedlyError: read ECONNRESET。离开打印语句后,这些错误会立即发生在 query1 上。将 client.end() 移到 forEach 循环之后只会导致代码无法运行 @HansyPiou 当您使用 async/await 时,我建议您使用 for (let snaptime of snaptimes) ... 而不是 snaptimes.forEach(function (snaptime) ... );,因为这样您就不会调用其他函数并且更容易同步迭代步骤. 现在一切都很好!使用 for() 循环意味着我必须将它们全部包装到一个更大的(异步)函数中,但这最终解决了 node-postgres 的问题,因为我可以在收集数据之前连接客户端并在完成后关闭它。感谢您的所有帮助!【参考方案2】:

确保你应该在外面使用async 块:

async function() 
  return await Promise.resolve('')

并且在node 7.6.0之后默认支持。在 7.6.0 之前,您应该使用--harmony 选项来为它工作。

node -v先检查你的版本。

【讨论】:

【参考方案3】:

首先,您对 async-await 的了解还不够。别担心,其实很简单;但是您需要阅读文档才能使用这些东西。

更重要的是,您的代码的问题在于您只能在 async 函数中使用 await;你是在 any 函数之外做的。

首先,这是最接近您编写的代码的解决方案:

const  Client  = require('pg');
const moment = require('moment');
const _ = require('lodash');
const turf = require('@turf/turf');

const connString = // connection string
var collected = []
const CID = 300
const snaptimes = // array of times
var counter=0;
const client = new Client(connString);

function createArray(i,j) 
    // return array of i arrays of length j


async function processSnaptime (snaptime) 
    var info = ; // an object of objects
    // get information at given snaptime from database 1
    const query1 = // parametrized query selecting two columns from database 1
    const result1 = await client.query(query1, [CID,snaptime]);
    var x = result1.rows;
    for (var i = 0; i < x.length; i++) 
      // store data from database 1 into info
      // each row is an object with two fields
    

    // line up subjects on the hole
    const query2 = // parametrized query grabbing JSON string from database 2
    const result2 = await client.query(query2, [CID,snaptime]);
    const raw = result2.rows[0].JSON_col;
    const line = createArray(19,0); // an array of 19 empty arrays
    for (var i = 0; i < raw.length; i++) 
      // parse JSON object and record data into line
    

    // begin to collect data
    var n = 0;
    var g = 0;
    // walk down the line
    for (var i = 18; i > 0; i--) 
      // if no subjects are found at spot i, do nothing, except maybe update g
      if ((line[i] === undefined || line[i].length == 0) && g == 0)
        g = i;
       else if (line[i] !== undefined && line[i].length != 0) 
        // collect data for each subject if subjects are found
        line[i].forEach(function(subject)
          const query 3 = // parametrized query grabbing data for each subject
          const result3 = await client.query(query3,[CID,subject,snaptime]);
          x = result3.rows;
          const y = moment(x[0].end_time).diff(moment(snaptime),'minutes');
          var yhat = 0;
          // the summation over info depends on g
          if (g===0)
            for (var j = i; j <= 18; j++)
              yhat = moment.duration(info[j].field1).add(yhat,'m').asMinutes();
            
           else 
            for (var j = i; j <= 18; j++)
              if (i<j && j<g+1) 
                yhat = moment.duration(info[j].field2).add(yhat,'m').asMinutes();
               else 
                yhat = moment.duration(info[j].field1).add(yhat,'m').asMinutes();
              
            
          
          collected.push([y,yhat,n,i]);
        );
      
      n+=line[i].length;
      g=0;
    
    // really rough work-around I once used for printing results after a forEach of queries
    counter++;
    if (counter===snaptimes.length)
      console.log(counter);
      console.log(collected);
    


async function run () 
  for (let snaptime of snaptimes) 
    await processSnaptime(snaptime);
  


/* to run all of them concurrently:
function run () 
  let procs = [];
  for (let snaptime of snaptimes) 
    procs.push(processSnaptime(snaptime));
  
  return Promise.all(procs);

*/

client.connect().then(run).then(() => client.end());

client.connect 返回一个承诺,一旦解决,我使用then 调用run。当 部分结束时,client.end() 可以安全地调用。

run 是一个async 函数,因此它可以使用await 使代码更具可读性。 processSnaptime 也是如此。

当然我不能真正运行你的代码,所以我只能希望我没有犯任何错误。

【讨论】:

以上是关于ForEach 循环中的异步/等待节点 Postgres 查询的主要内容,如果未能解决你的问题,请参考以下文章

如何在节点 js 中正确使用等待/异步与 for 循环

在 forEach 循环中等待 promise

执行异步forEach循环后重新定义全局变量[重复]

如何正确调用 Parallel.ForEach 循环中的调用异步方法[重复]

forEach 循环中的异步 findOne() 操作

@foreach 循环中的多种形式。如何使用 javascript 异步提交一个。 C# 核心剃刀