是否可以在nodejs Async(瀑布,系列等...)中构建动态任务列表
Posted
技术标签:
【中文标题】是否可以在nodejs Async(瀑布,系列等...)中构建动态任务列表【英文标题】:Is it possible to build a dynamic task list in nodejs Async (waterfall, series, etc...) 【发布时间】:2014-03-01 03:07:15 【问题描述】:我正在从 mongo 中包含节点和边缘数据的一些集合中提取信息。首先我必须得到节点,这样我才能抓住它的边缘。一旦我有一个边缘列表,我就会返回并获取更多节点(等等。基于深度值)。以下代码是我尝试使用 async.waterfall 和任务列表的一个松散示例。
最初我只有一个任务,但是一旦我进行第一个查询,我就会添加到任务数组中。不幸的是,这似乎没有向 async 注册,并且它不会继续处理我正在添加的任务。
有没有更好的方法来做到这一点?
var async = require('async')
var mongoose = require('mongoose')
var _ = requrie('underscore')
var Client = this.Mongo.connect(/*path to mongo*/)
var Node = mongoose.Schema(
id : String,
graph_id : String
)
var Edge = mongoose.Schema(
id : String,
source_id : String,
destination_id : String
)
var Nodes = Client.model('myNode', Node)
var Edges = Client.model('myEdge', Edge)
var funcs = []
var round = 1
var depth = 2
var query =
node :
id : '12345'
,
edge :
id : '12345'
var addTask = function(Nodes, Edges, query, round, depth)
return function(callback)
queryData(Nodes, Edges, query, function(err, node_list)
if(depth > round)
round++
function_array.push(addTask(Nodes, Edges, query, round, depth))
)
var queryData = function(Nodes, Edges, query, cb)
async.waterfall([
function(callback)
Nodes.find(query.node, function(err, nodes)
var node_keys = _.map(nodes, function(node)
return node.id
)
callback(null, nodes, node_keys)
)
,
function(nodes, node_keys, callback)
query.edge.$or = [ 'source_id' : $in:node_keys, 'destination_id' : $in:node_keys ]
Edges.find(query.edge, function(err, edges)
var edge_keys = _.map(edges, function(edge)
if(edge['_doc']['source_id'] != query.node.id)
return edge['_doc']['source_id']
else
return edge['_doc']['destination_id']
callback(null, nodes, edges, node_keys, edge_keys)
)
)
], function(err, nodes, edges, node_keys, edge_keys)
// update the results object then...
cb(null, _.uniq(edge_keys)
)
var function_array = []
function_array.push(addTask(Nodes, Edges, query, round, depth))
async.waterfall(function_array, function(err)
Client.disconnect()
//this should have run more than just the initial task but does not
)
---------- 更新 ------------- --
因此,在尝试通过添加尾随函数来尝试让 Async 瀑布或系列做到这一点之后,我决定改用 async.whilst,现在对解决方案感到满意。
function GraphObject()
this.function_array = []
GraphObject.prototype.doStuff = function()
this.function_array.push(this.buildFunction(100))
this.runTasks(function(err)
console.log('done with all tasks')
GraphObject.prototype.buildFunction = function(times)
return function(cb)
if(times != 0)
this.function_array.push(this.buildFunction(times - 1))
cb(null)
GraphObject.prototype.runTasks = function(cb)
var tasks_run = 0
async.whilst(
function()
return this.function_array.length > 0
.bind(this),
function(callback)
var func = this.function_array.shift()
func.call(this, function(err)
tasks_run++
callback(err)
)
.bind(this),
function(err)
console.log('runTasks ran '+tasks_run+' tasks')
if(err)
cb(500)
cb(null)
.bind(this)
)
【问题讨论】:
【参考方案1】:function_array 中的任务只能向数组中添加新任务,前提是它不是数组中的最后一个任务。
在您的情况下,您的 function_array 仅包含 1 个任务。该任务本身无法添加其他任务,因为它是最后一个任务。
解决方案是在数组中有 2 个任务。一个 startTask 引导进程,一个 finalTask 更像是一个虚拟任务。在这种情况下,
function_array = [startTask, finalTask];
然后startTask会添加taskA,taskB会添加task C,最终
function_array = [startTask, taskA, taskB, taskC, finalTask];
下面的示例代码说明了这些概念。
var async = require('async');
var max = 6;
var nodeTask = function(taskId, value, callback)
var r = Math.floor(Math.random() * 20) + 1;
console.log("From Node Task %d: %d", taskId, r);
// add an edge task
if (taskId < max)
function_array.splice(function_array.length-1, 0, edgeTask);
callback(null, taskId + 1, value + r);
;
var edgeTask = function(taskId, value, callback)
var r = Math.floor(Math.random() * 20) + 1;
console.log("From Edge Task %d: %d", taskId, r);
// add a node task
if (taskId < max)
function_array.splice(function_array.length-1, 0, nodeTask);
callback(null, taskId + 1, value + r);
;
var startTask = function(callback)
function_array.splice(function_array.length-1, 0, nodeTask);
callback(null, 1, 0);
;
var finalTask = function(taskId, value, callback)
callback(null, value);
;
var function_array = [startTask, finalTask];
async.waterfall(function_array, function (err, result)
console.log("Sum is ", result);
);
【讨论】:
这正是开尔文的解决方案,谢谢! Async API 中是否有一个部分描述了任务列表中“最终”任务与其他任务的区别?如果有,将其链接到解决方案中可能会有所帮助。 不在 API 文档中,而是在 [来源] (github.com/caolan/async/blob/master/lib/async.js#L490) 中。我实际上是凭直觉工作的,写了一些示例代码来检查,然后去检查源代码。感兴趣的源代码行介于 490 到 499 之间。 在实施“最终功能”修复后遇到奇怪的行为。现在 async.waterfall 结果回调被触发多次。要去查源头,但没有意义。 您的任务是否调用了正确的回调?使用嵌套,很容易调用错误的回调。另外,使用 async#waterfall 可能不是最佳选择。得到一个节点并抓取它的边缘后,可以使用 async#each 来处理边缘任务吗? 是的,那是我的问题。我有错误的回调开始。以上是关于是否可以在nodejs Async(瀑布,系列等...)中构建动态任务列表的主要内容,如果未能解决你的问题,请参考以下文章