Lambda - 将 CSV 从 S3 导入 RDS MySQL

Posted

技术标签:

【中文标题】Lambda - 将 CSV 从 S3 导入 RDS MySQL【英文标题】:Lambda - Import CSV from S3 to RDS MySQL 【发布时间】:2020-05-11 00:03:27 【问题描述】:

我有一个 Lambda 函数,可以将特定的 CSV 文件从 S3 导入 mysql。但是,CSV 的文件大小约为 1 GB。当我运行此代码时,它不会处理并超时。

//s3 to rds
const fs = require("fs");
const AWS = require('aws-sdk');
var mysql = require('mysql');
var config = require('./config.json');
const s3 = new AWS.S3(
  accessKeyId: 'XXXXXXXXXXXXXXX',
  secretAccessKey: 'XXXXXXXXXXXXXXXXXXXXXXXXXXxx'
);
var filePath = `localfilepath`;

var pool = mysql.createPool(
  host: config.dbhost,
  user: config.dbuser,
  password: config.dbpassword,
  database: config.dbname
);
pool.getConnection((err, connection) => 
  if (err) throw err;
  console.log("Connected!" + connection);

  var s3Params = 
    Bucket: '<your_bucket_name>',
    Key: '<your_key>'
  ;
  s3.getObject(s3Params, function(err, result) 
    if (err) 
      throw new Error(err);
     else 
      console.log('file stored successfully', result);
      fs.createWriteStream(filePath).write(result.Body);
      connection.query('TRUNCATE TABLE <table_name>', (err, result) => 
        if (err) 
         throw new Error(err);
         else 
          console.log('table truncated');
          var query = `LOAD DATA LOCAL INFILE '<file_name>' INTO table <table_name> FIELDS TERMINATED BY ','  ENCLOSED BY '"' IGNORE 1 LINES `;
          connection.query(query, function(err, result) 
            if (err) throw err;
            console.log("Result: " + result);
            connection.release();
            fs.unlinkSync(filePath);
            console.log('file deleted');
          );
        
      );
    

  );
)

我怎样才能使它工作?

【问题讨论】:

1.我没有看到您实际上将文件保存在任何地方。它似乎正在将其加载到内存中。 2. 在 Lambda 运行时环境中可用于保存文件的总空间为半 Gig,因此您的文件太大而无法与 AWS Lambda 一起使用。 执行此操作的选项有哪些? 在上传到 S3 之前将文件拆分为更小的文件,或者使用 ECS、EKS 或 EC2 来运行导入而不是 Lambda。 是否可以使用 Lambda 拆分文件?恐怕我不能使用 EC2 或任何其他服务。只是 Lambda。 您可以尝试流式传输对象并一次写入一部分。请参阅此处的“检索对象的字节范围”示例:docs.aws.amazon.com/AWSjavascriptSDK/latest/AWS/… 【参考方案1】:

您基本上有 2 个障碍需要克服:1) Lambda 上的本地存储只有 512mb 和 2) Lambda 的执行时间限制为 15 分钟(您必须在函数上明确配置)

要解决问题1,您可以使用S3 Select。它允许您对 S3 中的对象(CSV 和 JSON 文件)执行 SQL 查询。对您的 CSV 文件执行 S3 选择查询,对于您检索的每条记录,您都可以将其插入队列并让其他工作人员将它们插入数据库。您也可以直接插入您的 RDS,但它可能会更慢。

这是一个代码示例:

const AWS = require('aws-sdk');
var fs = require('fs');

const S3 = new AWS.S3();

exports.handler = async (event, context) => 
    try 
        const query = "SELECT * FROM s3object s WHERE s.id > '0'";
        const bucket = 'my-bucket';
        const key = 'data.csv';

        const params = 
            Bucket: bucket,
            Key: key,
            ExpressionType: 'SQL',
            Expression: query,
            InputSerialization:  CSV:  FileHeaderInfo: 'USE'  ,
            OutputSerialization:  CSV:  
        

        const data = await getDataUsingS3Select(params);
        context.succeed(data);
     catch (error) 
        context.fail(error);
    
;

const getDataUsingS3Select = async (params) => 
    return new Promise((resolve, reject) => 
        S3.selectObjectContent(params, (err, data) => 
            if (err)  reject(err); 

            // This is a stream of events
            data.Payload.on('data', (event) => 
                // event, there is data inside it
                if (event.Records) 
                    // do what you want with payload: send to a queue or direct to db
                    console.log('Row:', event.Records.Payload.toString('utf8'));
                
            ).on('end', () => 
                // we arrive here after processing everything
                resolve();
            );
        );
    )

如果您仍然超过 15 分钟的时间限制,那就是问题 2。首先在 SQL 中添加一个limit 子句。然后您可以在 Lambda 的/tmp 目录中创建一个“检查点”文件。您可以保存您在那里处理的最后一条记录的id,这样当您重新运行您的 Lambda 函数时,它可以读取该文件,获取id 并在查询的where 子句中使用它,例如:

select * from s3object s where s.id > '99' limit 50000

【讨论】:

限时15分钟,你有可以使用的代码吗?【参考方案2】:

如果您的主要目标是将数据从 S3 上的 CSV 文件导入 RDS MySQL,请查看AWS Data Pipeline。它已经在Load S3 Data into Amazon RDS MySQL Table 中拥有此常见任务所需的所有已定义资源,但它使用 EC2 实例。但同时它更容易扩展和维护解决方案。

【讨论】:

【参考方案3】:

根据this 线程,他们确实希望在某个时候实现,但是什么时候是最好的猜测方案。

AWS Lambda 当前在 /tmp 目录中有 512mb 磁盘空间的“硬限制”(如 here 所述),因此由于文件大小为 1GB,fs.createWriteStream(filePath).write(result.Body); 行不应该在这里工作。该错误类似于"no space left on device"(来自审查现有线程)。

但是,在这种情况下,从 S3 加载文件应该可以工作。 Lambda 会按比例缩放内存和 CPU 大小,因此它可能会由于这里的内存不足而超时(取决于您设置的内容)。 This link 很好地指示了您需要为此设置什么(与您加载到内存和磁盘空间的内容有关)。

我建议在此阶段将流拆分为 512mb 块(this 包可能会有所帮助)并将它们分别存储在 S3 中,这样您就可以将此操作拆分为 2 个函数:

    获取数据并拆分为单独的 s3 文件(同时截断您的表)。 将 CSV 数据从 S3 加载回您的 RDS

(您可以为此使用Cloudwatch Events)

【讨论】:

你有我可以参考的代码示例吗?

以上是关于Lambda - 将 CSV 从 S3 导入 RDS MySQL的主要内容,如果未能解决你的问题,请参考以下文章

使用 lambda 中的 pandas 从 s3 读取 excel 文件并转换为 csv

如何在使用 EMR/Hive 将数据从 S3 导入 DynamoDB 时处理包含在引号 (CSV) 中的字段

AWS lambda 函数从 S3 文件夹中删除文件

将带逗号的双引号作为分隔符从 S3 导入 Amazon Redshift

最佳方法:如何将 dynamodb 表导出到 csv 并将其存储在 s3

尝试将对象放入 s3 时访问被拒绝