如何在 Node.js 流回调中聚合从异步函数生成的 Promise?
Posted
技术标签:
【中文标题】如何在 Node.js 流回调中聚合从异步函数生成的 Promise?【英文标题】:How do I aggregate promises generated from async functions within a Node.js stream callback? 【发布时间】:2016-06-01 10:48:57 【问题描述】:我有一个 Node.js Typescript 程序,我在其中尝试逐行解析大型 CSV 文件并异步处理这些行。更具体地说,我需要一个函数:
-
打开一个 CSV 文件。
将下一行解析为对象。
(理想情况下)收集一定数量的对象以进行批处理。
将对象传递给异步函数进行处理(返回承诺)。
从处理函数中收集 Promise。
一些要求和注意事项:
我需要对这些承诺中的任何一个进行投票以了解进展情况。 假设这些 CSV 文件很大;逐行流式传输是必要的。 我不应该在这些处理操作正在运行时阻止应用程序。 返回一组 promise 可能不是正确的方法,尤其是当我尝试读取包含一百万行的文件时。 我需要各种钩子来取消或重试失败的操作。这是我已经开始工作的一些测试代码。 ObjectStream
是一个自定义 Node.js 转换,可将 CSV 行转换为对象。
function parseFileAsync(filePath: string): Promise<any>
var doParseFileAsync = (filePath: string) =>
var streamDeferred = q.defer<Promise<any>[]>();
var promises: Promise<any>[] = [];
var propertyNames: string[] = [];
var stream = fs.createReadStream(filePath, encoding: "utf8" )
.pipe(new LineStream( objectMode: true ))
.pipe(new ObjectStream( objectMode: true ));
stream.on("readable", () =>
var obj: Object;
while ((obj = stream.read()) !== null)
console.log(`\nRead an object...`);
var operationDeferred = q.defer<any>();
operationDeferred.resolve(doSomethingAsync(obj));
promises.push(operationDeferred.promise);
);
stream.on("end", () =>
streamDeferred.resolve(promises);
);
return streamDeferred.promise;
return doParseFileAsync(filePath)
.then((result: Promise<any>[]) =>
return q.all(result);
);
parseFileAsync(filePath)
.done((result: any[]) =>
console.log(`\nFinished reading and processing the file:\n\t$result.toString()`);
);
最后的done
调用甚至在流开始运行之前执行,因为parseFileAsync
立即用一个空数组完成;流还没有机会推送任何承诺。
经过几天的搜索,我仍然不确定执行此操作的正确方法是什么。 Node/javascript 专家:帮助?
更新
代码已更新,我的承诺现在运行良好。但是,我需要一种方法来连接流并在需要时取消该过程。我还需要一种方法来重试任何失败的操作。
【问题讨论】:
我最初的想法(没有过多阅读)是您应该选择一种或另一种范式。这似乎最好由流范式中的map stream
之类的东西处理。如果你必须使用promise(也许消费者期望得到一个promise),只要你的流全部退出就返回一个promise并解决它。
创建一个新的 Promise P,它在流完成时完成,将它的 Promise 添加到列表中。返回 P 并让它解析 Promise 列表。等待 P,然后等待所有 P 的结果。
@nick 坚持使用流是不可能的。我正在开发的应用程序使用了 Promise。我现在才介绍一个需要读取文件的功能。
明白了。我认为您仍然可以在我的评论后半部分使用该方法。例如。只需将流式行为包装在一个 Promise 中即可。
感谢您的建议;他们工作得很好!我对我原来的问题做了一些更新;我需要一个钩子来取消和重试。
【参考方案1】:
我在程序架构中遇到了一些限制,这些限制不允许我随意传递 Promise。因此,与其启动一堆承诺,我决定等到上一批完成后再开始新的承诺。这是我采取的方法:
将流的东西分离到它自己的接受延续标记的函数中。如果有更多数据要读取,返回值将包含读取的数据以及延续令牌:
function readFile(filepath: string, lines: number, start: any): Promise<any>
...
定义一个将运行可重试操作的函数。在此函数的主体中,从文件中检索和处理一大块数据。如果结果有延续记号,“递归”再次调用操作函数:
function processFile(filepath: string, next: any): Promise<any>
var chunkSize = 1;
return readLines(filepath, chunkSize, next)
.then((result) =>
// Do something with `result.lines`
...
if (result.next)
return parseFile(filepath, result.next);
);
瞧!一个长时间运行的操作,对块进行操作并且很容易报告进度。
【讨论】:
以上是关于如何在 Node.js 流回调中聚合从异步函数生成的 Promise?的主要内容,如果未能解决你的问题,请参考以下文章