使用流异步读取文件时如何同步处理每一行/缓冲区[重复]
Posted
技术标签:
【中文标题】使用流异步读取文件时如何同步处理每一行/缓冲区[重复]【英文标题】:How to process each row/buffer synchronously when reading a file asynchronously using a stream [duplicate] 【发布时间】:2022-01-21 18:09:38 【问题描述】:如您所见,我有一个 js,它采用 .csv 并为每一行调用一个异步函数(迭代 4 个不同的函数)。
问题是我需要在第 i 次迭代中等待函数结束,然后再进行 i+1 次迭代。
const csv = require('csv-parser');
const fs = require('fs');
var i=1;
fs.createReadStream('table.csv')
.pipe(csv())
.on('data', (row) =>
switch(i%4)
case 1: org1createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 2: org2createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 3: org3createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 0: org4createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
i++;
)
.on('end', () =>
console.log('CSV file successfully processed');
);
async function org1createPatient(patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease)
...
async function org2createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease)
...
async function org3createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease)
...
async function org4createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease)
...
我怎样才能得到我想要的? 希望我的问题足够清楚!
【问题讨论】:
await
所有org[n]createPatient
函数,并为箭头函数使用异步 ((row) =>
)。
我认为您需要将行排入数组或散列,然后运行异步忙等待循环,直到获得您期望的所有结果...也许有更好的方法,这就是为什么我不提交这个作为答案
var orgCreate = [org4createPatient, org1createPatient, org2createPatient, org3createPatient]; await orgCreate[i%4](row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease);
在回调函数中等待是不够的,因为.on(event, callback)
不会同步调用callback
。流始终独立于任何回调保持并行运行。有关完整说明,请参阅下面的答案。
【参考方案1】:
您在这里使用的readStream
是异步的,这意味着.on(event, callback)
将在每次读取新数据时触发,与任何callback
触发无关。也就是说,这里callback
函数的执行不会影响这个进程,它会并行运行,每次收到event
。
这意味着如果callback
执行一段异步代码,您很可能会遇到这样的情况,即到下一次读取event
时,该函数的多个实例可能仍在运行收到了。
注意:这适用于任何事件,包括
'end'
事件。
如果你在callback
上使用async/await
,只会使这个函数的内部逻辑同步。它仍然不会影响读取数据的速率。
为此,您需要在callback
上同时使用async/await
(使其内部同步)并让callback
手动暂停和恢复并行发生的读取操作。
const csv = require('csv-parser');
const fs = require('fs');
let i = 1;
const stream = fs.createReadStream('table.csv').pipe(csv());
stream.on('data', async (row) =>
// pause overall stream until this row is processed
stream.pause();
// process row
switch (i%4)
case 1: await org1createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 2: await org2createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 3: await org3createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 0: await org4createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
i++;
// resume overall stream
stream.resume();
);
stream.on('end', () =>
// now guaranteed that no instances of `callback` is still running in parallel when this event is fired
console.log('CSV file successfully processed');
);
【讨论】:
另请注意:您可能听说过fs.readFileSync()
作为避免处理异步数据流的另一种选择。我不推荐它,因为它本质上会在允许您处理它之前一次(同步)读取和缓存整个文件。这比上面介绍的解决方案在内存上要重得多,后者一次只缓存和处理一行,尤其是在处理较大的文件时。【参考方案2】:
下面的解决方案是使用iter-ops库,在这种情况下效率很高,因为pipe(csv())
返回一个AsyncIterable
,所以应该做相应的处理。
既然你不关心那些处理函数返回什么,我们可以throttle每一行的处理:
const pipe, throttle, onEnd, catchError = require('iter-ops');
const csv = require('csv-parser');
const fs = require('fs');
const asyncIterable = fs.createReadStream('table.csv').pipe(csv());
const i = pipe(
asyncIterable,
throttle(async (row, index) =>
switch (index % 4)
case 1: await org1createPatient(row.patientId, ...); break;
case 2: await org2createPatient(row.patientId, ...); break;
case 3: await org3createPatient(row.patientId, ...); break;
case 0: await org4createPatient(row.patientId, ...); break;
default: break;
),
onEnd(s =>
console.log(`Completed $s.count rows, in $s.durationms`);
),
catchError((err, ctx) =>
console.log(`Failed on row with index $ctx.index:`, err);
throw err; // to stop the iteration
)
);
async function processCSV()
// this will trigger the iteration:
for await(const a of i)
// iterate and process the CSV
附:我是iter-ops的作者。
【讨论】:
以上是关于使用流异步读取文件时如何同步处理每一行/缓冲区[重复]的主要内容,如果未能解决你的问题,请参考以下文章