Mongoose 使用异步迭代器流式传输聚合查询

Posted

技术标签:

【中文标题】Mongoose 使用异步迭代器流式传输聚合查询【英文标题】:Mongoose stream an aggregation query using async iterators 【发布时间】:2021-07-30 16:56:11 【问题描述】:

我想使用 Mongoose 流式传输聚合查询的结果,以便允许客户端处理巨大的 JSON 响应(最终通过管道传输到 CSV 转换器)。

到目前为止我的代码:

const pipeline = [
    
      $unwind: 
        path: '$samples',
        // The name of a new field to hold the array index of the element.
        includeArrayIndex: 'num_sample',
        preserveNullAndEmptyArrays: true,
      ,
    ,
   
       $limit: 10000,
    ,
    
      $project: 
        _id: 0,
        t: '$samples.t',
        station: '$station.name',
        loc: '$station.location',
        data: '$samples.data',
      ,
    ,
  ];
  // const samples = await fixed.aggregate([pipeline]);

  const cursor = fixed
    .aggregate(pipeline)
    .cursor( batchSize: 1000 )
    .exec();
 res.writeHead(200,  'content-type': 'application/json' );
 res.write('[');
 await cursor.eachAsync(async (doc, i) => 
    res.write(JSON.stringify(doc));
    res.write(',');
  );
  res.write(']');
  res.end();

但是如何将响应以json2csv 传递给 CSV 转换器? 上面的代码在功能上是正确的吗? 我必须在响应流中写入额外的字符才能正确格式化 JSON,但找到的解决方案(带有最终 )在最终 JSON 中引入了一个空记录(我还没有找到在每个文档之后写“,”的方法从猫鼬游标返回,除了最后一个,因此我不得不引入一条空记录。

【问题讨论】:

【参考方案1】:

它是一个流,这意味着您必须将 pipe 该流发送到 res,如下所示:

cursor.pipe(JSONStream.stringify()).pipe(res)

查看question了解更多详情。

【讨论】:

【参考方案2】:

我终于用下面的代码解决了。

欢迎提出任何改进所提供解决方案的建议。

exports.get_FIXED_Samples_CSV = catchAsync(async (req, res, next) => 
  // retrieve start and end dates
  // if start is not provided then set it to current date - 30 days
  const startDate =
    moment(req.query.start).isValid() && req.query.start
      ? moment(new Date(req.query.start))
      : moment(new Date()).subtract(30, 'd');

  // if end is not provided or invalid set it to current date
  const endDate =
    moment(req.query.end).isValid() && req.query.end
      ? moment(new Date(req.query.end))
      : moment(new Date());

  // retrieve station name and check if valid,if not returns null
  const station = [
    'AQ101',
    'AQ102',
    'AQ103',
    'AQ104',
    'AQ105',
    'AQ106',
    'AQ107',
  ].includes(req.query.station)
    ? req.query.station
    : null;

  // eslint-disable line no-unused-vars
  const pipeline = [
    // sort by date DESC adn station.name ASC
    
      $sort: 
        'station.name': 1,
        date: -1,
      ,
    ,
    // unwind by samples array adding num_sample counting
    
      $unwind: 
        path: '$samples',
        // The name of a new field to hold the array index of the element.
        includeArrayIndex: 'num_sample',
        preserveNullAndEmptyArrays: true,
      ,
    ,
    // ~~~~~~~~~~~~ set limit to returning docs ~~~~~~~~~~~~~~~~~~~
    
      $limit: 216000,
    ,
    // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    // select fields to show
    
      $project: 
        _id: 0,
        t: '$samples.t',
        station: '$station.name',
        loc: '$station.location',
        data: '$samples.data',
      ,
    ,
  ];

  // add as first stage to the pipeline a match aggregation
  if (station) 
    // if valid station  is provided
    pipeline.unshift(
      $match: 
        'station.name': station,
        date: 
          $gte: new Date(startDate.format('YYYY-MM-DD')),
          $lte: new Date(endDate.format('YYYY-MM-DD')),
        ,
      ,
    );
   else 
    // if station is INVALID OR NOT provided
    pipeline.unshift(
      $match: 
        date: 
          $gte: new Date(startDate.format('YYYY-MM-DD')),
          $lte: new Date(endDate.format('YYYY-MM-DD')),
        ,
      ,
    );
  

  // transform to apply to generate CSV
  const custTransf = (item, strFormat = 'DD/MM/YYYY HH:mm:ss') => (
    utc: item.t,
    t: moment(item.t).format(strFormat),
    station: item.station,
    ...item.data,
  );
  // Unwind Samples properties and flatten arrays
  const transforms = [
    // flatten( objects: false, arrays: true ),
    custTransf,
  ];
  // const fields = ['t', 'station', 'data'];
  const opts =  transforms ;
  const transformOpts =  highWaterMark: 8192 ;
  const pipeTransf = new Transform(opts, transformOpts);
  // remove data prefix from fields
  const regex = /(data.)/gi;
  const filename = 'FixedStations';
  const strAtt = `attachment;filename=$filename-$startDate.format(
    'YYYY-MMM-DD'
  )-$endDate.format('YYYY-MMM-DD').csv`;
  res.header('Content-Type', 'text/csv');
  res.setHeader('Content-Type', 'text/csv');
  res.setHeader(
    'Content-Disposition',
    strAtt
    // 'attachment;filename=download.csv'
  );
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Pragma', 'no-cache');

  const cursor = fixed
    .aggregate(pipeline)
    .allowDiskUse(true)
    .cursor( transfor: JSON.stringify, batchSize: 1000 )
    .exec()
    .pipe(JSONStream.stringify())
    .pipe(pipeTransf)
    .pipe(replace(regex, ''))
    .pipe(res);
);

【讨论】:

以上是关于Mongoose 使用异步迭代器流式传输聚合查询的主要内容,如果未能解决你的问题,请参考以下文章

处理文件行流迭代器

MyBatis 流式查询

流式传输到 BigQuery 表的数据何时可用于 Query 操作?

还在用分页?太Low !试试 MyBatis 流式查询,真心强大!

还在用分页?太Low !试试 MyBatis 流式查询,真心强大!

还在用分页?太Low !试试 MyBatis 流式查询,真心强大!