是否可以在nodejs Async(瀑布,系列等...)中构建动态任务列表

Posted

技术标签:

【中文标题】是否可以在nodejs Async(瀑布,系列等...)中构建动态任务列表【英文标题】:Is it possible to build a dynamic task list in nodejs Async (waterfall, series, etc...) 【发布时间】:2014-03-01 03:07:15 【问题描述】:

我正在从 mongo 中包含节点和边缘数据的一些集合中提取信息。首先我必须得到节点,这样我才能抓住它的边缘。一旦我有一个边缘列表,我就会返回并获取更多节点(等等。基于深度值)。以下代码是我尝试使用 async.waterfall 和任务列表的一个松散示例。

最初我只有一个任务,但是一旦我进行第一个查询,我就会添加到任务数组中。不幸的是,这似乎没有向 async 注册,并且它不会继续处理我正在添加的任务。

有没有更好的方法来做到这一点?

var async = require('async')
var mongoose = require('mongoose')
var _ = requrie('underscore')

var Client = this.Mongo.connect(/*path to mongo*/)
var Node = mongoose.Schema(
    id : String,
    graph_id : String
)
var Edge = mongoose.Schema(
    id : String,
    source_id : String,
    destination_id : String
)
var Nodes = Client.model('myNode', Node)
var Edges = Client.model('myEdge', Edge)
var funcs = []
var round = 1
var depth = 2

var query = 
    node : 
        id : '12345'
    ,
    edge : 
        id : '12345'   
    


var addTask = function(Nodes, Edges, query, round, depth) 
    return function(callback) 
    queryData(Nodes, Edges, query, function(err, node_list) 
        if(depth > round) 
             round++
             function_array.push(addTask(Nodes, Edges, query, round, depth))
        
    )



var queryData = function(Nodes, Edges, query, cb) 
async.waterfall([
    function(callback) 
        Nodes.find(query.node, function(err, nodes) 
            var node_keys = _.map(nodes, function(node) 
                 return node.id  
            )
            callback(null, nodes, node_keys)  
        )
    ,
    function(nodes, node_keys, callback) 
        query.edge.$or = [ 'source_id' : $in:node_keys, 'destination_id' : $in:node_keys ]
        Edges.find(query.edge, function(err, edges) 
            var edge_keys = _.map(edges, function(edge) 
                if(edge['_doc']['source_id'] != query.node.id) 
                     return edge['_doc']['source_id']
                 else 
                     return edge['_doc']['destination_id']
                
                callback(null, nodes, edges, node_keys, edge_keys)
            )
        )
    
], function(err, nodes, edges, node_keys, edge_keys) 
    // update the results object then...
    cb(null, _.uniq(edge_keys)
 )


var function_array = []
function_array.push(addTask(Nodes, Edges, query, round, depth))

async.waterfall(function_array, function(err) 
    Client.disconnect()
    //this should have run more than just the initial task but does not
)    

---------- 更新 ------------- --

因此,在尝试通过添加尾随函数来尝试让 Async 瀑布或系列做到这一点之后,我决定改用 async.whilst,现在对解决方案感到满意。

function GraphObject() 
  this.function_array = []


GraphObject.prototype.doStuff = function() 
  this.function_array.push(this.buildFunction(100))
  this.runTasks(function(err) 
     console.log('done with all tasks')
  


GraphObject.prototype.buildFunction = function(times) 
  return function(cb) 
    if(times != 0) 
      this.function_array.push(this.buildFunction(times - 1))
    
    cb(null)
  


GraphObject.prototype.runTasks = function(cb) 
  var tasks_run = 0
  async.whilst(
    function()
      return this.function_array.length > 0
    .bind(this),
    function(callback) 
      var func = this.function_array.shift()
      func.call(this, function(err)  
        tasks_run++
        callback(err) 
      )
    .bind(this),
    function(err) 
      console.log('runTasks ran '+tasks_run+' tasks')
      if(err) 
        cb(500)
      
      cb(null)
    .bind(this)
  )

【问题讨论】:

【参考方案1】:

function_array 中的任务只能向数组中添加新任务,前提是它不是数组中的最后一个任务。

在您的情况下,您的 function_array 仅包含 1 个任务。该任务本身无法添加其他任务,因为它是最后一个任务。

解决方案是在数组中有 2 个任务。一个 startTask 引导进程,一个 finalTask​​ 更像是一个虚拟任务。在这种情况下,

function_array = [startTask, finalTask];

然后startTask会添加taskA,taskB会添加task C,最终

function_array = [startTask, taskA, taskB, taskC, finalTask];

下面的示例代码说明了这些概念。

    var async = require('async');
    var max = 6;

    var nodeTask = function(taskId, value, callback)
        var r = Math.floor(Math.random() * 20) + 1;
        console.log("From Node Task %d: %d", taskId, r);

        // add an edge task
        if (taskId < max) 
            function_array.splice(function_array.length-1, 0, edgeTask);
        

        callback(null, taskId + 1, value + r);
    ;

    var edgeTask = function(taskId, value, callback)
        var r = Math.floor(Math.random() * 20) + 1;
        console.log("From Edge Task %d: %d", taskId, r);

        // add a node task
        if (taskId < max) 
            function_array.splice(function_array.length-1, 0, nodeTask);
        

        callback(null, taskId + 1, value + r);
    ;

    var startTask = function(callback) 
        function_array.splice(function_array.length-1, 0, nodeTask);
        callback(null, 1, 0);
    ;

    var finalTask = function(taskId, value, callback) 
        callback(null, value);
    ;

    var function_array = [startTask, finalTask];

    async.waterfall(function_array, function (err, result) 
        console.log("Sum is ", result);
    );

【讨论】:

这正是开尔文的解决方案,谢谢! Async API 中是否有一个部分描述了任务列表中“最终”任务与其他任务的区别?如果有,将其链接到解决方案中可能会有所帮助。 不在 API 文档中,而是在 [来源] (github.com/caolan/async/blob/master/lib/async.js#L490) 中。我实际上是凭直觉工作的,写了一些示例代码来检查,然后去检查源代码。感兴趣的源代码行介于 490 到 499 之间。 在实施“最终功能”修复后遇到奇怪的行为。现在 async.waterfall 结果回调被触发多次。要去查源头,但没有意义。 您的任务是否调用了正确的回调?使用嵌套,很容易调用错误的回调。另外,使用 async#waterfall 可能不是最佳选择。得到一个节点并抓取它的边缘后,可以使用 async#each 来处理边缘任务吗? 是的,那是我的问题。我有错误的回调开始。

以上是关于是否可以在nodejs Async(瀑布,系列等...)中构建动态任务列表的主要内容,如果未能解决你的问题,请参考以下文章

调用猫鼬保存方法时async.js挂起的瀑布方法

Asyncjs:绕过瀑布链中的函数

将异步函数添加到异步瀑布

从async / await nodejs返回多个值

嵌套在 async.js 瀑布中的异步函数

嵌套在 async.js 瀑布中的异步函数