使用流异步读取文件时如何同步处理每一行/缓冲区[重复]

Posted

技术标签:

【中文标题】使用流异步读取文件时如何同步处理每一行/缓冲区[重复]【英文标题】:How to process each row/buffer synchronously when reading a file asynchronously using a stream [duplicate] 【发布时间】:2022-01-21 18:09:38 【问题描述】:

如您所见,我有一个 js,它采用 .csv 并为每一行调用一个异步函数(迭代 4 个不同的函数)。

问题是我需要在第 i 次迭代中等待函数结束,然后再进行 i+1 次迭代

const csv = require('csv-parser');
const fs = require('fs');

var i=1;

fs.createReadStream('table.csv')
  .pipe(csv())
  .on('data', (row) => 
      switch(i%4)
          case 1: org1createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
          case 2: org2createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
          case 3: org3createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
          case 0: org4createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
      
    i++;
  )
  .on('end', () => 
    console.log('CSV file successfully processed');
  );





  async function org1createPatient(patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) 
    ...
  

  async function org2createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) 
    ...
  

  async function org3createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) 
   ...
  

  async function org4createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) 
   ...
  

我怎样才能得到我想要的? 希望我的问题足够清楚!

【问题讨论】:

await 所有org[n]createPatient 函数,并为箭头函数使用异步 ((row) => )。 我认为您需要将行排入数组或散列,然后运行异步忙等待循环,直到获得您期望的所有结果...也许有更好的方法,这就是为什么我不提交这个作为答案 var orgCreate = [org4createPatient, org1createPatient, org2createPatient, org3createPatient]; await orgCreate[i%4](row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); 在回调函数中等待是不够的,因为.on(event, callback) 不会同步调用callback。流始终独立于任何回调保持并行运行。有关完整说明,请参阅下面的答案。 【参考方案1】:

您在这里使用的readStream 是异步的,这意味着.on(event, callback) 将在每次读取新数据时触发,与任何callback 触发无关。也就是说,这里callback函数的执行不会影响这个进程,它会并行运行,每次收到event

这意味着如果callback 执行一段异步代码,您很可能会遇到这样的情况,即到下一次读取event 时,该函数的多个实例可能仍在运行收到了。

注意:这适用于任何事件,包括 'end' 事件。

如果你在callback 上使用async/await,只会使这个函数的内部逻辑同步。它仍然不会影响读取数据的速率。

为此,您需要在callback 上同时使用async/await(使其内部同步)并让callback 手动暂停和恢复并行发生的读取操作。

const csv = require('csv-parser');
const fs = require('fs');

let i = 1;

const stream = fs.createReadStream('table.csv').pipe(csv());

stream.on('data', async (row) => 
   // pause overall stream until this row is processed
   stream.pause();

   // process row
   switch (i%4)
      case 1: await org1createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
      case 2: await org2createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
      case 3: await org3createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
      case 0: await org4createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
   
   i++;

   // resume overall stream
   stream.resume();
);

stream.on('end', () => 
  // now guaranteed that no instances of `callback` is still running in parallel when this event is fired
  console.log('CSV file successfully processed');
);

【讨论】:

另请注意:您可能听说过 fs.readFileSync() 作为避免处理异步数据流的另一种选择。我不推荐它,因为它本质上会在允许您处理它之前一次(同步)读取和缓存整个文件。这比上面介绍的解决方案在内存上要重得多,后者一次只缓存和处理一行,尤其是在处理较大的文件时。【参考方案2】:

下面的解决方案是使用iter-ops库,在这种情况下效率很高,因为pipe(csv())返回一个AsyncIterable,所以应该做相应的处理。

既然你不关心那些处理函数返回什么,我们可以throttle每一行的处理:

const pipe, throttle, onEnd, catchError = require('iter-ops');
const csv = require('csv-parser');
const fs = require('fs');

const asyncIterable = fs.createReadStream('table.csv').pipe(csv());

const i = pipe(
    asyncIterable,
    throttle(async (row, index) => 
        switch (index % 4) 
            case 1: await org1createPatient(row.patientId, ...); break;
            case 2: await org2createPatient(row.patientId, ...); break;
            case 3: await org3createPatient(row.patientId, ...); break;
            case 0: await org4createPatient(row.patientId, ...); break;
            default: break;
        
    ),
    onEnd(s => 
        console.log(`Completed $s.count rows, in $s.durationms`);
    ),
    catchError((err, ctx) => 
        console.log(`Failed on row with index $ctx.index:`, err);
        throw err; // to stop the iteration
    )
);

async function processCSV() 
    // this will trigger the iteration:
    for await(const a of i) 
        // iterate and process the CSV
    

附:我是iter-ops的作者。

【讨论】:

以上是关于使用流异步读取文件时如何同步处理每一行/缓冲区[重复]的主要内容,如果未能解决你的问题,请参考以下文章

sed基础

sed基础

如何从流中读取 CSV 文件并在写入时处理每一行?

sed命令基本使用

sed运用

sed 技巧