如何在 Node.js 流回调中聚合从异步函数生成的 Promise?

Posted

技术标签:

【中文标题】如何在 Node.js 流回调中聚合从异步函数生成的 Promise?【英文标题】:How do I aggregate promises generated from async functions within a Node.js stream callback? 【发布时间】:2016-06-01 10:48:57 【问题描述】:

我有一个 Node.js Typescript 程序,我在其中尝试逐行解析大型 CSV 文件并异步处理这些行。更具体地说,我需要一个函数:

    打开一个 CSV 文件。 将下一行解析为对象。 (理想情况下)收集一定数量的对象以进行批处理。 将对象传递给异步函数进行处理(返回承诺)。 从处理函数中收集 Promise。

一些要求和注意事项:

我需要对这些承诺中的任何一个进行投票以了解进展情况。 假设这些 CSV 文件很大;逐行流式传输是必要的。 我不应该在这些处理操作正在运行时阻止应用程序。 返回一组 promise 可能不是正确的方法,尤其是当我尝试读取包含一百万行的文件时。 我需要各种钩子来取消或重试失败的操作。

这是我已经开始工作的一些测试代码。 ObjectStream 是一个自定义 Node.js 转换,可将 CSV 行转换为对象。

function parseFileAsync(filePath: string): Promise<any> 
    var doParseFileAsync = (filePath: string) => 
        var streamDeferred = q.defer<Promise<any>[]>();
        var promises: Promise<any>[] = [];
        var propertyNames: string[] = [];

        var stream = fs.createReadStream(filePath,  encoding: "utf8" )
            .pipe(new LineStream( objectMode: true ))
            .pipe(new ObjectStream( objectMode: true ));

        stream.on("readable", () => 
            var obj: Object;
            while ((obj = stream.read()) !== null) 
                console.log(`\nRead an object...`);

                var operationDeferred = q.defer<any>();
                operationDeferred.resolve(doSomethingAsync(obj));
                promises.push(operationDeferred.promise);
            
        );
        stream.on("end", () => 
            streamDeferred.resolve(promises);
        );

        return streamDeferred.promise;
    

    return doParseFileAsync(filePath)
        .then((result: Promise<any>[]) => 
            return q.all(result);
        );

parseFileAsync(filePath)
    .done((result: any[]) => 
        console.log(`\nFinished reading and processing the file:\n\t$result.toString()`);
    );

最后的done 调用甚至在流开始运行之前执行,因为parseFileAsync 立即用一个空数组完成;流还没有机会推送任何承诺。

经过几天的搜索,我仍然不确定执行此操作的正确方法是什么。 Node/javascript 专家:帮助?

更新

代码已更新,我的承诺现在运行良好。但是,我需要一种方法来连接流并在需要时取消该过程。我还需要一种方法来重试任何失败的操作。

【问题讨论】:

我最初的想法(没有过多阅读)是您应该选择一种或另一种范式。这似乎最好由流范式中的map stream 之类的东西处理。如果你必须使用promise(也许消费者期望得到一个promise),只要你的流全部退出就返回一个promise并解决它。 创建一个新的 Promise P,它在流完成时完成,将它的 Promise 添加到列表中。返回 P 并让它解析 Promise 列表。等待 P,然后等待所有 P 的结果。 @nick 坚持使用流是不可能的。我正在开发的应用程序使用了 Promise。我现在才介绍一个需要读取文件的功能。 明白了。我认为您仍然可以在我的评论后半部分使用该方法。例如。只需将流式行为包装在一个 Promise 中即可。 感谢您的建议;他们工作得很好!我对我原来的问题做了一些更新;我需要一个钩子来取消和重试。 【参考方案1】:

我在程序架构中遇到了一些限制,这些限制不允许我随意传递 Promise。因此,与其启动一堆承诺,我决定等到上一批完成后再开始新的承诺。这是我采取的方法:

    将流的东西分离到它自己的接受延续标记的函数中。如果有更多数据要读取,返回值将包含读取的数据以及延续令牌:

    function readFile(filepath: string, lines: number, start: any): Promise<any> 
        ...
    
    

    定义一个将运行可重试操作的函数。在此函数的主体中,从文件中检索和处理一大块数据。如果结果有延续记号,“递归”再次调用操作函数:

    function processFile(filepath: string, next: any): Promise<any> 
        var chunkSize = 1;
        return readLines(filepath, chunkSize, next)
            .then((result) => 
                // Do something with `result.lines`
                ...
                if (result.next) 
                    return parseFile(filepath, result.next);
                
            );
    
    

瞧!一个长时间运行的操作,对块进行操作并且很容易报告进度。

【讨论】:

以上是关于如何在 Node.js 流回调中聚合从异步函数生成的 Promise?的主要内容,如果未能解决你的问题,请参考以下文章

node.js 在执行'n'异步回调后执行函数

Node.js - 如何使用回调调用异步函数?

Node.js - 为啥我的一些回调没有异步执行?

如何在Node.js的回调中正确测试代码?

node js - 当函数依赖于内部的异步函数时,如何从函数返回值

5Node.js 回调函数