异步队列,文件流结束如何知道两者何时完成

Posted

技术标签:

【中文标题】异步队列,文件流结束如何知道两者何时完成【英文标题】:Async queue, filestream end how to know when both finished 【发布时间】:2017-06-27 23:37:01 【问题描述】:

我在使用带有文件流的 async.queue 时遇到了一个小问题

    我的文件流将完成的场景 我将 fileRead 设置为 true 但是队列将是空的并且已经调用了 drain 这会导致我的“完成”永远不会被调用

在我的文件流“结束”并且队列为空之后,说“结束队列”的正确方式是什么?

var fs = require('fs')
    , util = require('util')
    , stream = require('stream')
    , es = require('event-stream');

var async = require('async');

var fileRead = false;
var lineNr = 0;

var q = async.queue(function(task, callback) 
    task(function(err, lineData)
        responseLines.push(lineData);
        callback();
      );
  , 5);

var q.drain = function() 
  if(fileRead)
    done(null, responseLines);
  


var s = fs.createReadStream('very-large-file.csv')
    .pipe(es.split())
    .pipe(es.mapSync(function(line)
        s.pause();
        q.push(async.apply(insertIntoDb, line))
        s.resume();
    )
    .on('error', function(err)
       done(err);
    )
    .on('end', function()
      fileRead = true;
    )
);

或者是否有更好的异步使用方法可以让我这样做? 逐行异步处理,如果其中一行有错误,则可以提前退出

【问题讨论】:

您可以在将fileRead 设置为true 后立即添加另一个任务。我认为您的问题是您使用每个队列项调用的 task 函数在您的流上调用 end 事件之前被调用并完成。 【参考方案1】:

首先,我不确定您的 sn-p 有多少是伪代码,但 var q.drain = ... 不是有效的 javascript 并且应该出错。它应该只是q.drain =,因为您要在现有对象上定义一个属性,而不是声明一个新变量。如果不是伪代码,这可能就是为什么你的 drain 函数不会触发的原因。

有几种方法可以实现我认为您正在尝试做的事情。一种方法是检查结束处理程序中的队列长度,如果仍有要处理的项目,则设置排水功能。

.on('end', function()
  if(!q.length)
    callDone();
  
  else 
    q.drain = callDone;
  
);

function callDone()
  done(null, responseLines);

这实际上是在说“如果队列已被处理,则调用完成,如果没有,则在处理完毕时调用完成!”我确信有很多方法可以整理您的代码,但希望这能为您的特定问题提供解决方案。

【讨论】:

以上是关于异步队列,文件流结束如何知道两者何时完成的主要内容,如果未能解决你的问题,请参考以下文章

如何等待所有节点流完成/结束?

使用流异步读取文件时如何同步处理每一行/缓冲区[重复]

检测文件流已完成并准备好使用 avplayer 播放

节点zlib增量膨胀

iOS - Swift 我如何知道 copyItemAtPath 何时完成?

异步流与响应式扩展相比如何?