Mongoose 使用异步迭代器流式传输聚合查询
Posted
技术标签:
【中文标题】Mongoose 使用异步迭代器流式传输聚合查询【英文标题】:Mongoose stream an aggregation query using async iterators 【发布时间】:2021-07-30 16:56:11 【问题描述】:我想使用 Mongoose 流式传输聚合查询的结果,以便允许客户端处理巨大的 JSON 响应(最终通过管道传输到 CSV 转换器)。
到目前为止我的代码:
const pipeline = [
$unwind:
path: '$samples',
// The name of a new field to hold the array index of the element.
includeArrayIndex: 'num_sample',
preserveNullAndEmptyArrays: true,
,
,
$limit: 10000,
,
$project:
_id: 0,
t: '$samples.t',
station: '$station.name',
loc: '$station.location',
data: '$samples.data',
,
,
];
// const samples = await fixed.aggregate([pipeline]);
const cursor = fixed
.aggregate(pipeline)
.cursor( batchSize: 1000 )
.exec();
res.writeHead(200, 'content-type': 'application/json' );
res.write('[');
await cursor.eachAsync(async (doc, i) =>
res.write(JSON.stringify(doc));
res.write(',');
);
res.write(']');
res.end();
但是如何将响应以json2csv
传递给 CSV 转换器?
上面的代码在功能上是正确的吗?
我必须在响应流中写入额外的字符才能正确格式化 JSON,但找到的解决方案(带有最终 )在最终 JSON 中引入了一个空记录(我还没有找到在每个文档之后写“,”的方法从猫鼬游标返回,除了最后一个,因此我不得不引入一条空记录。
【问题讨论】:
【参考方案1】:它是一个流,这意味着您必须将 pipe
该流发送到 res
,如下所示:
cursor.pipe(JSONStream.stringify()).pipe(res)
查看question了解更多详情。
【讨论】:
【参考方案2】:我终于用下面的代码解决了。
欢迎提出任何改进所提供解决方案的建议。
exports.get_FIXED_Samples_CSV = catchAsync(async (req, res, next) =>
// retrieve start and end dates
// if start is not provided then set it to current date - 30 days
const startDate =
moment(req.query.start).isValid() && req.query.start
? moment(new Date(req.query.start))
: moment(new Date()).subtract(30, 'd');
// if end is not provided or invalid set it to current date
const endDate =
moment(req.query.end).isValid() && req.query.end
? moment(new Date(req.query.end))
: moment(new Date());
// retrieve station name and check if valid,if not returns null
const station = [
'AQ101',
'AQ102',
'AQ103',
'AQ104',
'AQ105',
'AQ106',
'AQ107',
].includes(req.query.station)
? req.query.station
: null;
// eslint-disable line no-unused-vars
const pipeline = [
// sort by date DESC adn station.name ASC
$sort:
'station.name': 1,
date: -1,
,
,
// unwind by samples array adding num_sample counting
$unwind:
path: '$samples',
// The name of a new field to hold the array index of the element.
includeArrayIndex: 'num_sample',
preserveNullAndEmptyArrays: true,
,
,
// ~~~~~~~~~~~~ set limit to returning docs ~~~~~~~~~~~~~~~~~~~
$limit: 216000,
,
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// select fields to show
$project:
_id: 0,
t: '$samples.t',
station: '$station.name',
loc: '$station.location',
data: '$samples.data',
,
,
];
// add as first stage to the pipeline a match aggregation
if (station)
// if valid station is provided
pipeline.unshift(
$match:
'station.name': station,
date:
$gte: new Date(startDate.format('YYYY-MM-DD')),
$lte: new Date(endDate.format('YYYY-MM-DD')),
,
,
);
else
// if station is INVALID OR NOT provided
pipeline.unshift(
$match:
date:
$gte: new Date(startDate.format('YYYY-MM-DD')),
$lte: new Date(endDate.format('YYYY-MM-DD')),
,
,
);
// transform to apply to generate CSV
const custTransf = (item, strFormat = 'DD/MM/YYYY HH:mm:ss') => (
utc: item.t,
t: moment(item.t).format(strFormat),
station: item.station,
...item.data,
);
// Unwind Samples properties and flatten arrays
const transforms = [
// flatten( objects: false, arrays: true ),
custTransf,
];
// const fields = ['t', 'station', 'data'];
const opts = transforms ;
const transformOpts = highWaterMark: 8192 ;
const pipeTransf = new Transform(opts, transformOpts);
// remove data prefix from fields
const regex = /(data.)/gi;
const filename = 'FixedStations';
const strAtt = `attachment;filename=$filename-$startDate.format(
'YYYY-MMM-DD'
)-$endDate.format('YYYY-MMM-DD').csv`;
res.header('Content-Type', 'text/csv');
res.setHeader('Content-Type', 'text/csv');
res.setHeader(
'Content-Disposition',
strAtt
// 'attachment;filename=download.csv'
);
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Pragma', 'no-cache');
const cursor = fixed
.aggregate(pipeline)
.allowDiskUse(true)
.cursor( transfor: JSON.stringify, batchSize: 1000 )
.exec()
.pipe(JSONStream.stringify())
.pipe(pipeTransf)
.pipe(replace(regex, ''))
.pipe(res);
);
【讨论】:
以上是关于Mongoose 使用异步迭代器流式传输聚合查询的主要内容,如果未能解决你的问题,请参考以下文章
流式传输到 BigQuery 表的数据何时可用于 Query 操作?
还在用分页?太Low !试试 MyBatis 流式查询,真心强大!