在 node.js 中使用异步瀑布

Posted

技术标签:

【中文标题】在 node.js 中使用异步瀑布【英文标题】:Using Async waterfall in node.js 【发布时间】:2014-10-31 12:57:30 【问题描述】:

我有 2 个异步运行的函数。我想用瀑布模型来写它们。问题是,我不知道如何..

这是我的代码:

var fs = require('fs');
function updateJson(ticker, value) 
  //var stocksJson = JSON.parse(fs.readFileSync("stocktest.json"));
  fs.readFile('stocktest.json', function(error, file) 
    var stocksJson =  JSON.parse(file);

    if (stocksJson[ticker]!=null) 
      console.log(ticker+" price : " + stocksJson[ticker].price);
      console.log("changing the value...")
      stocksJson[ticker].price =  value;

      console.log("Price after the change has been made -- " + stocksJson[ticker].price);
      console.log("printing the the Json.stringify")
      console.log(JSON.stringify(stocksJson, null, 4));
      fs.writeFile('stocktest.json', JSON.stringify(stocksJson, null, 4), function(err)   
        if(!err) 
          console.log("File successfully written");
        
        if (err) 
          console.error(err);
        
      ); //end of writeFile
     else 
      console.log(ticker + " doesn't exist on the json");
    
  );
 // end of updateJson 

知道如何使用瀑布来编写它,这样我就可以控制它了吗?请给我写一些例子,因为我是 node.js 的新手

【问题讨论】:

基本上你想知道在你的代码中你必须调用你的第二个函数以使其仅在第一个函数结束写入文件的作业时执行,并且可能在另一个异步上重复此过程功能,对吧? 【参考方案1】:

首先确定步骤并将它们编写为异步函数(带回调参数)

读取文件

function readFile(readFileCallback) 
    fs.readFile('stocktest.json', function (error, file) 
        if (error) 
            readFileCallback(error);
         else 
            readFileCallback(null, file);
        
    );

处理文件(我在示例中删除了大部分 console.log)

function processFile(file, processFileCallback) 
    var stocksJson = JSON.parse(file);
    if (stocksJson[ticker] != null) 
        stocksJson[ticker].price = value;
        fs.writeFile('stocktest.json', JSON.stringify(stocksJson, null, 4), function (error) 
            if (err) 
                processFileCallback(error);
             else 
                console.log("File successfully written");
                processFileCallback(null);
            
        );
    
    else 
        console.log(ticker + " doesn't exist on the json");
        processFileCallback(null); //callback should always be called once (and only one time)
    

请注意,我在这里没有进行具体的错误处理,我将利用 async.waterfall 将错误处理集中在同一个地方。

还要注意,如果您在异步函数中有 (if/else/switch/...) 分支,它总是会调用回调一次(且仅一次)。

使用 async.waterfall 插入所有内容

async.waterfall([
    readFile,
    processFile
], function (error) 
    if (error) 
        //handle readFile error or processFile error here
    
);

干净的例子

前面的代码过于冗长,以使解释更清楚。这是一个完整的清理示例:

async.waterfall([
    function readFile(readFileCallback) 
        fs.readFile('stocktest.json', readFileCallback);
    ,
    function processFile(file, processFileCallback) 
        var stocksJson = JSON.parse(file);
        if (stocksJson[ticker] != null) 
            stocksJson[ticker].price = value;
            fs.writeFile('stocktest.json', JSON.stringify(stocksJson, null, 4), function (error) 
                if (!err) 
                    console.log("File successfully written");
                
                processFileCallback(err);
            );
        
        else 
            console.log(ticker + " doesn't exist on the json");
            processFileCallback(null);
        
    
], function (error) 
    if (error) 
        //handle readFile error or processFile error here
    
);

我留下了函数名称,因为它有助于提高可读性并有助于使用 chrome 调试器等工具进行调试。

如果使用underscore(on npm),也可以将第一个函数替换为_.partial(fs.readFile, 'stocktest.json')

【讨论】:

所以基本上processFile 是从fs.readFile 的回调中获取file 对吧?我应该首先声明所有功能然后在瀑布中使用它们吗?您提供了 2 个使用瀑布的示例.. 对。 (如果 readFile 中有错误,waterfall 将自动跳过 processFile。)然后选择编写您最熟悉的代码的方式,第一或第二。 我可以将整个waterfall function 包装成for loop 吗?是不是要在下一次循环迭代之前完成里面的所有操作? 不,你应该使用类似async.eachSeries的东西。把你对瀑布的使用放在一个异步函数中(比如updateJsonAsync(options,callback)),然后用async.eachSeries( [ticker:...,value:...,...], updateJsonAsync, function(err)/*handle error*/ )这样的东西来调用它 所以我可以通过这种方式将瀑布函数放入包装函数中:updateJsonAsync(ticker, value, callback) 对吗?瀑布得到的第二个参数是我的包装函数的回调,对吗?【参考方案2】:

首先,确保您read the documentation regarding async.waterfall

现在,关于瀑布控制流有几个关键部分:

    控制流由一组函数指定为第一个参数调用,当流完成时的“完成”回调作为第二个参数。 函数数组在系列中调用(相对于并行)。 如果流数组中的任何操作遇到错误(通常命名为err),它将短路并立即调用“完成”/“完成”/“完成”callback。 先前执行的函数的参数是applied 到控制流中的下一个函数,按顺序,“中间”回调作为最后一个参数提供。注意:第一个函数只有这个“中间”回调,而“完成”回调将有控制流中最后一个调用函数的参数(考虑到任何错误),但前面有一个 err 参数而不是一个附加的“中间”回调。 每个单独操作的回调(在我的示例中我称之为cbAsync)应该在您准备好继续前进时调用:第一个参数将是一个错误,如果有的话,第二个(第三个,第四个...等)参数将是您要传递给后续操作的任何数据。

第一个目标是在引入async.waterfall 的同时让您的代码几乎逐字运行。我决定删除您所有的 console.log 语句并简化您的错误处理。这是第一次迭代(未经测试的代码):

var fs = require('fs'),
    async = require('async');

function updateJson(ticker,value) 
    async.waterfall([ // the series operation list of `async.waterfall`
        // waterfall operation 1, invoke cbAsync when done
        function getTicker(cbAsync) 
            fs.readFile('stocktest.json',function(err,file) 
                if ( err ) 
                    // if there was an error, let async know and bail
                    cbAsync(err);
                    return; // bail
                
                var stocksJson = JSON.parse(file);
                if ( stocksJson[ticker] === null ) 
                    // if we don't have the ticker, let "complete" know and bail
                    cbAsync(new Error('Missing ticker property in JSON.'));
                    return; // bail
                
                stocksJson[ticker] = value;
                // err = null (no error), jsonString = JSON.stringify(...)
                cbAsync(null,JSON.stringify(stocksJson,null,4));    
            );
        ,
        function writeTicker(jsonString,cbAsync) 
            fs.writeFile('stocktest.json',jsonString,function(err) 
                cbAsync(err); // err will be null if the operation was successful
            );
        
    ],function asyncComplete(err)  // the "complete" callback of `async.waterfall`
        if ( err )  // there was an error with either `getTicker` or `writeTicker`
            console.warn('Error updating stock ticker JSON.',err);
         else 
            console.info('Successfully completed operation.');
        
    );

第二次迭代进一步划分了操作流程。它将其放入较小的面向单一操作的代码块中。我不打算评论它,它不言自明(再次,未经测试):

var fs = require('fs'),
    async = require('async');

function updateJson(ticker,value,callback)  // introduced a main callback
    var stockTestFile = 'stocktest.json';
    async.waterfall([
        function getTicker(cbAsync) 
            fs.readFile(stockTestFile,function(err,file) 
                cbAsync(err,file);
            );
        ,
        function parseAndPrepareStockTicker(file,cbAsync) 
            var stocksJson = JSON.parse(file);
            if ( stocksJson[ticker] === null ) 
                cbAsync(new Error('Missing ticker property in JSON.'));
                return;
            
            stocksJson[ticker] = value;
            cbAsync(null,JSON.stringify(stocksJson,null,4));
        ,
        function writeTicker(jsonString,cbAsync) 
            fs.writeFile('stocktest.json',jsonString,,function(err) 
                cbAsync(err);
            );
        
    ],function asyncComplete(err) 
        if ( err ) 
            console.warn('Error updating stock ticker JSON.',err);
        
        callback(err);
    );

最后一次迭代通过使用一些 bind 技巧来减少调用堆栈并提高可读性 (IMO),从而缩短了很多时间,同样未经测试:

var fs = require('fs'),
    async = require('async');

function updateJson(ticker,value,callback) 
    var stockTestFile = 'stocktest.json';
    async.waterfall([
        fs.readFile.bind(fs,stockTestFile),
        function parseStockTicker(file,cbAsync) 
            var stocksJson = JSON.parse(file);
            if ( stocksJson[ticker] === null ) 
                cbAsync(new Error('Missing ticker property in JSON.'));
                return;
            
            cbAsync(null,stocksJson);
        ,
        function prepareStockTicker(stocksJson,cbAsync) 
            stocksJson[ticker] = value;
            cbAsync(null,JSON.stringify(stocksJson,null,4));
        ,
        fs.writeFile.bind(fs,stockTestFile)
    ],function asyncComplete(err) 
        if ( err ) 
            console.warn('Error updating stock ticker JSON.',err);
        
        callback(err);
    );

【讨论】:

非常感谢,现在我更明白了!我还有一个问题:如果我需要多次执行整个操作,比如说 5 次,我可以将它全部包装到 for loop 整个 function 中吗?它会工作吗,我的意思是它会完成内部的所有操作,直到循环的第二次迭代? @user3502786 你不能使用for 循环,如果你这样做了,那么多个async.waterfall 操作将并行运行。请改用async.eachSeriesasync.whilstasync.until。这些等价于for 循环,但会等到异步的callback 被调用,然后再进行下一次迭代(换句话说,for 循环将产生)。 @user3502786 示例:var tickers = [ticker:'GOOG',value:1,ticker:'YHOO',value:2]; async.eachSeries(tickers,updateJson,function(err)/*done*/); 但您必须将您的 updateJson 更改为 function updateJson(obj,callback)var ticker = obj.ticker,value = obj.value;async.waterfall([/* flow operations go here */],callback);【参考方案3】:

基本上,需要一些时间来执行(无论是用于 I/O 还是 CPU 处理)的 nodejs(以及更普遍的 javascript)函数通常是异步的,因此事件循环(为了简单起见,是一个不断检查任务的循环要执行)可以调用第一个函数正下方的函数,而不会因响应而被阻塞。如果您熟悉 C 或 Java 等其他语言,您可以将异步函数视为在另一个线程上运行的函数(在 javascript 中不一定如此,但程序员不应该关心它)以及何时执行终止线程通知主线程(事件循环一)工作已完成并有结果。

正如所说,一旦第一个函数结束了它的工作,它必须能够通知它的工作已经完成,并且它会调用你传递给它的回调函数。举个例子:

var callback = function(data,err)

   if(!err)
   
     do something with the received data
   
   else
     something went wrong



asyncFunction1(someparams, callback);

asyncFunction2(someotherparams);

执行流程将调用:asyncFunction1、asyncFunction2 和下面的每个函数,直到 asyncFunction1 结束,然后调用作为最后一个参数传递给 asyncFunction1 的回调函数来处理数据,如果没有发生错误。

因此,要让 2 个或多个异步函数仅在它们结束时一个接一个地执行,您必须在它们的回调函数中调用它们:

function asyncTask1(data, function(result1, err)

   if(!err)
     asyncTask2(data, function(result2, err2)
     
           if(!err2)
        //call maybe a third async function
           else
             console.log(err2);
     );
    else
     console.log(err);
);

result1 是 asyncTask1 的返回值,result2 是 asyncTask2 的返回值。你可以通过这种方式嵌套多少个你想要的异步函数。

如果您希望在 updateJson() 之后调用另一个函数,则必须在此行之后调用它:

console.log("File successfully written");

【讨论】:

很好地解释了异步操作,但是这个问题专门针对caolan's async module。我相信 OP 了解异步代码的工作原理,因为它们已经在嵌套异步函数。

以上是关于在 node.js 中使用异步瀑布的主要内容,如果未能解决你的问题,请参考以下文章

将参数传递给 Node.js 异步瀑布

Node.js之异步编程

在异步 AWS Lambda 函数中使用带有 node-fetch 模块的 node.js 时遇到问题

在 Node.js 的异步函数中填充数组

node.js:如何在 forEach 循环中使用异步调用实现路由方法?

node.js 异步使用 SQL