使用 websockets (Node, Socket.io) 读取 CSV 文件并间隔发送数据

Posted

技术标签:

【中文标题】使用 websockets (Node, Socket.io) 读取 CSV 文件并间隔发送数据【英文标题】:Reading CSV file and sending data in intervals with websockets (Node, Socket.io) 【发布时间】:2013-07-18 08:19:07 【问题描述】:

我对 Node 和 Express.js 比较陌生。我正在尝试创建一个 websocket 服务器以不规则的间隔推送 CSV 数据,这些数据存储在文件本身中,一行一行地。 CSV 结构是这样的: [超时[ms],data1,data2,data3 ...]

我已经成功创建了一个与客户端通信的 websocket 服务器。

我正在寻找有效执行以下操作的最佳解决方案: 1.读取CSV文件的一行 2.用WebSockets发送一行 3.暂停读取存储在行的第一个值的一段时间 4. 间隔结束后继续读取,返回步骤1。

到目前为止,我已经做到了这一点(请随意完全丢弃我的代码,因为它可能非常错误 - 正如我所说,我是新手。看起来 pause() 没有做任何事情。

var $    = require('jquery')
,csv = require('csv');

exports.index = function(server)
  var io   = require('socket.io').listen(server);

  io.sockets.on('connection', function (socket) 

  socket.on('startTransmission', function(msg) 
    csv()
    .from.path('C:/dev/node_express/csv/test.csv',  delimiter: ',', escape: '"' )
    .on('record', function(row,index)
      var rowArray = $.parseJSON(JSON.stringify(row));
      var json = ,
          that = this;
        $.each(rowArray, function(i,value)
          json[keys[i]] = value;
        );
        socket.emit('transmitDataData', json);
        //this.pause(); //I guess around here is where I'd like to pause 
        // setTimeout(function()
        //   that.resume();  //and resume here after the timeout, stored in the first value (rowArray[0])    
        // , rowArray[0]);

    );
);
);
;

不幸的是,注释掉的代码不起作用 - 所有数据都立即发送,逐行发送,函数不会暂停

【问题讨论】:

有什么理由使用jquery。您没有在此处操作任何 DOM 元素。 不,没有——我之前在做实验,忘记删除了。感谢您指出,但这并不能解决问题。 暂停和恢复不起作用吗?究竟是什么问题?该代码看起来应该可以工作(如果您取消注释暂停代码) 不幸的是,注释掉的代码不起作用。我会更新问题。 【参考方案1】:

我在另一个用例中遇到了同样的事情。问题是在流上调用 pause() 会暂停底层流读取,但不会暂停 csv 记录解析,因此可以使用构成最后读取流块的其余记录调用 record 事件。在我的情况下,我同步了它们,如下所示:

var rows=0, actions=0;

stream.on('record', function(row, index)                                                                 

    rows++;                                

    // pause here, but expect more record events until the raw read stream is exhausted
    stream.pause();

    runner.do(row, function(err, result)                                                  

        // when actions have caught up to rows read, read more rows.
        if (actions==rows) 
            stream.resume();
                            
    );
);

在您的情况下,我会缓冲行并使用计时器释放它们。这是一个未经测试的重构,只是为了让您了解我的意思:

var $ = require('jquery'),
    csv = require('csv');

exports.index = function(server)

  var io = require('socket.io').listen(server);
  io.sockets.on('connection', function (socket) 

      socket.on('startTransmission', function(msg) 

        var timer=null, buffered=[], stream=csv().from.path('C:/dev/node_express/csv/test.csv',  delimiter: ',', escape: '"' );

        function transmit(row)         
            socket.emit('transmitDataData', row);                                     
               

        function drain(timeout)                                                     
            if (!timer) 
                timer = setTimeout(function()                                     
                    timer = null;
                    if (buffered.length<=1)  // get more rows ahead of time so we don't run out. otherwise, we could skip a beat.
                        stream.resume(); // get more rows
                     else                         
                        var row = buffered.shift();
                        transmit(row);
                        drain(row[0]);                        
                    

                , timeout);               
                            
        

        stream.on('record', function(row,index)                        
            stream.pause();                                                                                   
            if (index == 0)                             
                transmit(row);                                               
             else                             
                buffered.push(row);                                   
                                                                   
            drain(row[0]); // assuming row[0] contains a timeout value.                                                                  
        );

        stream.on('end', function() 
            // no more rows. wait for buffer to empty, then cleanup.
        );

        stream.on('error', function() 
            // handle error.
        );

    );
;

【讨论】:

以上是关于使用 websockets (Node, Socket.io) 读取 CSV 文件并间隔发送数据的主要内容,如果未能解决你的问题,请参考以下文章

websocket 工作原理

你真的了解WebSocket吗?

nodejs socket 怎么检测客户端主动断开连接

websocket 原理

node socket onmessage

C 实现的 WebSocket 服务器握手失败