@aws-sdk/lib-storage 使用 JSONStream.stringify() 将 JSON 从 MongoDB 流式传输到 S3

Posted

技术标签:

【中文标题】@aws-sdk/lib-storage 使用 JSONStream.stringify() 将 JSON 从 MongoDB 流式传输到 S3【英文标题】:@aws-sdk/lib-storage to Stream JSON from MongoDB to S3 with JSONStream.stringify() 【发布时间】:2021-11-24 04:34:10 【问题描述】:

我正在尝试使用新版本的 @aws-sdk/lib-storage 将 JSON 从 MongoDB 流式传输到 S3:

"@aws-sdk/client-s3": "^3.17.0"
"@aws-sdk/lib-storage": "^3.34.0"
"JSONStream": "^1.3.5",

尝试 #1:我似乎没有正确使用 JSONStream.stringify():

import  MongoClient  from 'mongodb';
import  S3Client  from '@aws-sdk/client-s3';
import  Upload  from '@aws-sdk/lib-storage';
const s3Client = new S3Client( region: env.AWS_REGION );

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => 
  let client;

  try 
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('').limit(5).stream();
    readStream.pipe(JSONStream.stringify());
 
    const upload = new Upload(
      client: s3Client,
      params: 
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: readStream,
      ,
    );
    
    await upload.done(); 
  
  catch (err) 
    log.error(err);
    throw err.name;
  
  finally 
    if (client) 
      client.close();
    
  

;

错误 #1:

TypeError [ERR_INVALID_ARG_TYPE]:第一个参数必须是 类型字符串、缓冲区、ArrayBuffer、数组或类似数组的对象。 接收到的类型对象 在 Function.from (buffer.js:305:9) 在 getDataReadable (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) 在 processTicksAndRejections (internal/process/task_queues.js:94:5) 在 Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) 在 Upload.__doConcurrentUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) 在异步 Promise.all(索引 0) 在 Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) 在 Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)

尝试#2,使用变量jsonStream

  const readStream = db.collection(collectionName).find('').limit(5).stream();
    const jsonStream = readStream.pipe(JSONStream.stringify());
 
    const upload = new Upload(
      client: s3Client,
      params: 
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: jsonStream,
      ,
    );

错误 #2:

ReferenceError: ReadableStream 未定义 在 Object.getChunk (/.../node_modules/@aws-sdk/lib-storage/src/chunker.ts:22:30) 在 Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:187:24) 在 Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:37)

尝试#3:使用stream.PassThrough

    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('').limit(5).stream();
    readStream.pipe(JSONStream.stringify()).pipe(uploadStreamFile('benda_mongo.json'));

...

const stream = require('stream');
export const uploadStreamFile = async(fileName) => 
  try

    const pass = new stream.PassThrough();
    const upload = new Upload(
      client: s3Client,
      params: 
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: pass,
      ,
    );
    const res = await upload.done();
    
    log.info('finished uploading file', fileName);
    return res;
  
  catch(err)
    return;
  
;

错误 #3:

'dest.on 不是 Stream.pipe 中的函数(internal/streams/legacy.js:30:8'

尝试 #4:mongodb.stream(transform: doc => JSON.stringify...) 而不是 JSONStream:

import  S3Client  from '@aws-sdk/client-s3';
import  Upload  from '@aws-sdk/lib-storage';
import  env  from '../../../env';
const s3Client = new S3Client( region: env.AWS_REGION );

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => 
  let client;

  try 
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName)
      .find('')
      .limit(5)
      .stream( transform: doc => JSON.stringify(doc) + '\n' );
  
    const upload = new Upload(
      client: s3Client,
      params: 
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: readStream,
      ,
    );
  
    await upload.done(); 
  
  catch (err) 
    log.error('waaaaa', err);
    throw err.name;
  
  finally 
    if (client) 
      client.close();
    
  
;

错误:#4:

TypeError [ERR_INVALID_ARG_TYPE]:第一个参数必须是 类型字符串、缓冲区、ArrayBuffer、数组或类似数组的对象。 接收到的类型对象 在 Function.from (buffer.js:305:9) 在 getDataReadable (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) 在 processTicksAndRejections (internal/process/task_queues.js:94:5) 在 Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) 在 Upload.__doConcurrentUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) 在异步 Promise.all(索引 0) 在 Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) 在 Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)

尝试#5:使用stream.PassThrough()并将pass返回到pipe

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => 
  let client;

  try 
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('').limit(5).stream( transform: doc => JSON.stringify(doc) + '\n' );
    readStream.pipe(uploadStreamFile());
  
  catch (err) 
    log.error('waaaaa', err);
    throw err.name;
  
  finally 
    if (client) 
      client.close();
    
  
;


const stream = require('stream');

export const uploadStreamFile = async() => 
  try
    const pass = new stream.PassThrough();
    const upload = new Upload(
      client: s3Client,
      params: 
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: pass,
      ,
    );
    await upload.done();
    return pass;
  
  catch(err)
    log.error('pawoooooo', err);
    return;
  
;

错误 #5:

TypeError: dest.on 不是函数 在 Cursor.pipe (_stream_readable.js:680:8)

【问题讨论】:

谢谢,但遇到了同样的错误。我成功地通过管道传输到 fs.createWriteStream 并写入文件,稍后我可以使用 fs.createReadStream 并将其传输到我的“uploadStreamFile”,它将起作用。但我不喜欢这个解决方案,因为它导致我的服务器写入临时文件,而不是直接将 mongoDb res 流式传输到 s3。 谢谢,完整的堆栈跟踪是:'dest.on is not a function at Stream.pipe (internal/streams/legacy.js:30:8' 我已经用所有相关的错误信息更新了问题 我删除了 cmets,因为它们已经包含在答案中。我希望任何建议的替代方案都对您有用。 【参考方案1】:

查看您的错误堆栈跟踪后,问题可能与 MongoDB 驱动程序 provides a cursor in object mode 而 UploadBody 参数需要 传统 流,适用于在这种情况下由Buffer 处理。

以您的原始代码为参考,您可以尝试提供Transform 流来处理这两个要求。

请考虑以下代码:

import  Transform  from 'stream';
import  MongoClient  from 'mongodb';
import  S3Client  from '@aws-sdk/client-s3';
import  Upload  from '@aws-sdk/lib-storage';
const s3Client = new S3Client( region: env.AWS_REGION );

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => 
  let client;

  try 
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('').limit(5).stream();
    // We are creating here a Transform to adapt both sides
    const toJSONTransform = new Transform(
      writableObjectMode: true,
      transform(chunk, encoding, callback) 
        this.push(JSON.stringify(chunk) + '\n');
        callback();  
        
    );

    readStream.pipe(toJSONTransform);
 
    const upload = new Upload(
      client: s3Client,
      params: 
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: toJSONTransform,
      ,
    );
    
    await upload.done(); 
  
  catch (err) 
    log.error(err);
    throw err.name;
  
  finally 
    if (client) 
      client.close();
    
  

;

在代码中,toJSONTransform 我们将流的可写部分定义为对象模式;相比之下,可读部分将适合从 S3 Upload 方法中读取...至少,我希望如此。

关于您报告的第二个错误,即与dest.on 相关的错误,我最初认为,我写信给您的可能性是,该错误是有动机的,因为在uploadStreamFile 中您返回的是Promise,而不是流,并且您将 Promise 传递给需要流的 pipe 方法,基本上是您返回了错误的变量。但是我没有意识到您正在尝试将 PassThrough 流作为参数传递给 Upload 方法:请注意,此流不包含任何信息,因为您没有将任何信息传递给它,从 MongoDB 查询获得的可读流的内容永远不会传递给回调,也不会传递给 Upload 本身。

【讨论】:

谢谢,我也试过了。使用以下语法运行代码:'Body: readStream.pipe(JSONStream.stringify()),' 将返回以下错误:' ReferenceError: ReadableStream is not defined' 很遗憾听到这个消息。请问,您能否提供有关问题中错误堆栈跟踪的更多信息?我认为这可能是相关的。 谢谢,我已经用所有相关的错误信息更新了问题 非常感谢@OronBen-David。我用更多的选择更新了答案。关于ReferenceError: ReadableStream is not defined,我认为这与使用JSONStream.stringify 获得的流仅在我们的用例中是可写的,但不能同时读取的事实有关:我想我误解了库文档,它指出它可以是既可写又可读,就像一个变换,但恐怕不能同时进行。这个错误是有道理的。 这很棒@OronBen-David。我很高兴听到它工作正常。非常感谢您分享基于stream.PassThrough 的解决方案。【参考方案2】:

我找到了使用 stream.PassThrough 的其他解决方案,使用 JSONStream 将流式传输对象数组而不是一个接一个:

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => 
  let client;

  try 
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const passThroughStream = new stream.PassThrough();
    const readStream = db.collection(collectionName)
      .find('')
      .stream();

    readStream.on('end', () => passThroughStream.end());

    readStream.pipe(JSONStream.stringify()).pipe(passThroughStream);
    await uploadStreamFile('benda_mongo.json', passThroughStream);
  
  catch (err) 
    log.error(err);
    throw err.name;
  
  finally 
    if (client) 
      client.close();
    
  
;


export const uploadStreamFile = async(fileName, stream) => 
  try
    log.info('start uploading file', fileName);
    const upload = new Upload(
      client: s3Client,
      params: 
        Bucket: 'test-bucket',
        Key: `$fileName`,
        Body: stream,
      ,
    );

    const res = await upload.done();
    log.info('finished uploading file', fileName);
    return res;
  
  catch(err)
    log.error(err);
    return;
  
;

【讨论】:

以上是关于@aws-sdk/lib-storage 使用 JSONStream.stringify() 将 JSON 从 MongoDB 流式传输到 S3的主要内容,如果未能解决你的问题,请参考以下文章

为啥在 Log4j2 上使用 Slf4j 时 log4j-slf4j-impl 不依赖于 log4j-core

如何使用循环转换优化 C 代码? [关闭]

使用PHP几种写99乘法表的方式

J-Link固件烧录以及使用J-Flash向arm硬件板下载固件程序

JavaScript break 使用

如何使用J-Link远程调试?