Lambda - 将 CSV 从 S3 导入 RDS MySQL
Posted
技术标签:
【中文标题】Lambda - 将 CSV 从 S3 导入 RDS MySQL【英文标题】:Lambda - Import CSV from S3 to RDS MySQL 【发布时间】:2020-05-11 00:03:27 【问题描述】:我有一个 Lambda 函数,可以将特定的 CSV 文件从 S3 导入 mysql。但是,CSV 的文件大小约为 1 GB。当我运行此代码时,它不会处理并超时。
//s3 to rds
const fs = require("fs");
const AWS = require('aws-sdk');
var mysql = require('mysql');
var config = require('./config.json');
const s3 = new AWS.S3(
accessKeyId: 'XXXXXXXXXXXXXXX',
secretAccessKey: 'XXXXXXXXXXXXXXXXXXXXXXXXXXxx'
);
var filePath = `localfilepath`;
var pool = mysql.createPool(
host: config.dbhost,
user: config.dbuser,
password: config.dbpassword,
database: config.dbname
);
pool.getConnection((err, connection) =>
if (err) throw err;
console.log("Connected!" + connection);
var s3Params =
Bucket: '<your_bucket_name>',
Key: '<your_key>'
;
s3.getObject(s3Params, function(err, result)
if (err)
throw new Error(err);
else
console.log('file stored successfully', result);
fs.createWriteStream(filePath).write(result.Body);
connection.query('TRUNCATE TABLE <table_name>', (err, result) =>
if (err)
throw new Error(err);
else
console.log('table truncated');
var query = `LOAD DATA LOCAL INFILE '<file_name>' INTO table <table_name> FIELDS TERMINATED BY ',' ENCLOSED BY '"' IGNORE 1 LINES `;
connection.query(query, function(err, result)
if (err) throw err;
console.log("Result: " + result);
connection.release();
fs.unlinkSync(filePath);
console.log('file deleted');
);
);
);
)
我怎样才能使它工作?
【问题讨论】:
1.我没有看到您实际上将文件保存在任何地方。它似乎正在将其加载到内存中。 2. 在 Lambda 运行时环境中可用于保存文件的总空间为半 Gig,因此您的文件太大而无法与 AWS Lambda 一起使用。 执行此操作的选项有哪些? 在上传到 S3 之前将文件拆分为更小的文件,或者使用 ECS、EKS 或 EC2 来运行导入而不是 Lambda。 是否可以使用 Lambda 拆分文件?恐怕我不能使用 EC2 或任何其他服务。只是 Lambda。 您可以尝试流式传输对象并一次写入一部分。请参阅此处的“检索对象的字节范围”示例:docs.aws.amazon.com/AWSjavascriptSDK/latest/AWS/… 【参考方案1】:您基本上有 2 个障碍需要克服:1) Lambda 上的本地存储只有 512mb 和 2) Lambda 的执行时间限制为 15 分钟(您必须在函数上明确配置)
要解决问题1,您可以使用S3 Select。它允许您对 S3 中的对象(CSV 和 JSON 文件)执行 SQL 查询。对您的 CSV 文件执行 S3 选择查询,对于您检索的每条记录,您都可以将其插入队列并让其他工作人员将它们插入数据库。您也可以直接插入您的 RDS,但它可能会更慢。
这是一个代码示例:
const AWS = require('aws-sdk');
var fs = require('fs');
const S3 = new AWS.S3();
exports.handler = async (event, context) =>
try
const query = "SELECT * FROM s3object s WHERE s.id > '0'";
const bucket = 'my-bucket';
const key = 'data.csv';
const params =
Bucket: bucket,
Key: key,
ExpressionType: 'SQL',
Expression: query,
InputSerialization: CSV: FileHeaderInfo: 'USE' ,
OutputSerialization: CSV:
const data = await getDataUsingS3Select(params);
context.succeed(data);
catch (error)
context.fail(error);
;
const getDataUsingS3Select = async (params) =>
return new Promise((resolve, reject) =>
S3.selectObjectContent(params, (err, data) =>
if (err) reject(err);
// This is a stream of events
data.Payload.on('data', (event) =>
// event, there is data inside it
if (event.Records)
// do what you want with payload: send to a queue or direct to db
console.log('Row:', event.Records.Payload.toString('utf8'));
).on('end', () =>
// we arrive here after processing everything
resolve();
);
);
)
如果您仍然超过 15 分钟的时间限制,那就是问题 2。首先在 SQL 中添加一个limit
子句。然后您可以在 Lambda 的/tmp
目录中创建一个“检查点”文件。您可以保存您在那里处理的最后一条记录的id
,这样当您重新运行您的 Lambda 函数时,它可以读取该文件,获取id
并在查询的where
子句中使用它,例如:
select * from s3object s where s.id > '99' limit 50000
【讨论】:
限时15分钟,你有可以使用的代码吗?【参考方案2】:如果您的主要目标是将数据从 S3 上的 CSV 文件导入 RDS MySQL,请查看AWS Data Pipeline。它已经在Load S3 Data into Amazon RDS MySQL Table 中拥有此常见任务所需的所有已定义资源,但它使用 EC2 实例。但同时它更容易扩展和维护解决方案。
【讨论】:
【参考方案3】:根据this 线程,他们确实希望在某个时候实现,但是什么时候是最好的猜测方案。
AWS Lambda 当前在 /tmp 目录中有 512mb 磁盘空间的“硬限制”(如 here 所述),因此由于文件大小为 1GB,fs.createWriteStream(filePath).write(result.Body);
行不应该在这里工作。该错误类似于"no space left on device"
(来自审查现有线程)。
但是,在这种情况下,从 S3 加载文件应该可以工作。 Lambda 会按比例缩放内存和 CPU 大小,因此它可能会由于这里的内存不足而超时(取决于您设置的内容)。 This link 很好地指示了您需要为此设置什么(与您加载到内存和磁盘空间的内容有关)。
我建议在此阶段将流拆分为 512mb 块(this 包可能会有所帮助)并将它们分别存储在 S3 中,这样您就可以将此操作拆分为 2 个函数:
-
获取数据并拆分为单独的 s3 文件(同时截断您的表)。
将 CSV 数据从 S3 加载回您的 RDS
(您可以为此使用Cloudwatch Events)
【讨论】:
你有我可以参考的代码示例吗?以上是关于Lambda - 将 CSV 从 S3 导入 RDS MySQL的主要内容,如果未能解决你的问题,请参考以下文章
使用 lambda 中的 pandas 从 s3 读取 excel 文件并转换为 csv
如何在使用 EMR/Hive 将数据从 S3 导入 DynamoDB 时处理包含在引号 (CSV) 中的字段
将带逗号的双引号作为分隔符从 S3 导入 Amazon Redshift