使用 Node.js 和 AWS Lambda 将 S3 文件的内容记录到 postgres 表
Posted
技术标签:
【中文标题】使用 Node.js 和 AWS Lambda 将 S3 文件的内容记录到 postgres 表【英文标题】:Logging contents of S3 file to postgres table with Node.js and AWS Lambda 【发布时间】:2015-06-16 19:01:47 【问题描述】:我正在 node.js 中编写一个 AWS Lambda 执行脚本,它响应 S3 put 事件(日志文件),读取文件并通过 COPY 命令插入 Postgres 表。似乎除了写入数据库(下面脚本中的logToPostgres
)之外的所有内容都按预期工作。
一些注意事项:
我已经删除了它检查以验证它是 S3 put 事件的部分,以及其他错误处理代码,因为它与这里无关。 数据库用户对表拥有INSERT
权限,对数据库拥有ALL
权限,可以从任何IP 访问(均已验证)。
secrets.js
是同一目录中的一个模块,用于导出数据库凭据
在本地运行脚本时,我可以正常写入数据库。
我没有达到任何 AWS Lambda 的限制 — 从 S3 下载的文件为 521 字节,超时设置为最长 60 秒(在测试和写入同一个数据库时,它在 300 毫秒内运行)。
cloud watch 中没有错误,通过在每一步添加日志记录,我能够将其缩小到代码的 stream.pipe(query)...
部分。由于某种原因,这部分没有被 AWS Lambda 执行,但它在本地运行良好。它没有发出'finished'
和'end'
事件,所以我猜它仍然未执行。
对问题可能出在哪里有任何想法?
var async = require('async');
var fs = require('fs');
var aws = require('aws-sdk');
var s3 = new aws.S3();
var pg = require('pg');
var pgCopy = require('pg-copy-streams').from;
var secrets = require('./secrets.js');
exports.handler = function(event, context)
var bucket = event.Records[0].s3.bucket.name;
var key = event.Records[0].s3.object.key;
async.waterfall(
[
function downloadWebhook(next)
s3.getObject(Bucket: bucket, Key: key, next);
,
function saveToDisk(response, next)
var file = fs.createWriteStream('/tmp/foo_' + Date.now());
file.write(response.Body);
file.close();
next(null, file.path);
,
function createStdinStream(path, next)
next(null, fs.createReadStream(path));
,
function logToPostgres(stream, next)
var client = new pg.Client('pg://' + secrets.user + ':' +
secrets.password + '@' + secrets.host + ':' +
secrets.port + '/' + secrets.database);
client.connect(function (error)
if (error) console.error(error);
var query = client.query(pgCopy('COPY my_table FROM STDIN'));
stream.pipe(query)
.on('finish', function ()
client.end();
next(null, null);
);
);
],
function (error)
if (error) console.error(error);
context.done();
);
;
更新:
事实证明,可写流现在会发出 'finish'
事件,因此将其更改为 'finish'
并包含答案中的建议使其运行时不会出错。但是,在 lambda 运行后,数据库中仍然没有一行。我怀疑事务正在回滚,但无法查明原因或位置。我什至尝试过明确开始并提交事务,但没有骰子。
【问题讨论】:
检查Lambda Limits 以确保您没有达到这些目标,并将query.on('error', function(error) ...);
error handler 添加到您的logToPostgres
方法中,因为它可能会在那里崩溃。
@TristanFoureur 我没有达到任何这些限制。文件大小为 520 字节,在本地机器上,一切都在 200 毫秒内完成(我的函数中的 lambda 超时设置为 60 秒)。我确实在管道之后有on('error'... )
用于query
,但是该事件要么没有被触发(因为没有执行任何操作),要么没有错误。请注意,AWS lambda 完全运行干净,没有报告错误。数据库表中没有任何内容。
@TristanFoureur 请参阅下面我对 William Gaul 建议的回复。
【参考方案1】:
不确定您的解决方案出了什么问题。这是我经过测试且更紧凑的解决方案。请让我知道这是否有效!
var aws = require('aws-sdk');
var s3 = new aws.S3();
var S3S = require('s3-streams');
var pg = require('pg');
var pgCopy = require('pg-copy-streams').from;
var secrets = require('./secrets.js');
exports.handler = function(event, context)
var bucket = event.Records[0].s3.bucket.name;
var key = event.Records[0].s3.object.key;
var stream = S3S.ReadStream(s3, Bucket: bucket, Key: key);
pg.connect(secrets.connector, function(err, client)
if (err) console.log(err);
var query = client.query(pgCopy(
"COPY event_log(user_id, event, ...) FROM STDIN CSV"
));
stream.pipe(query)
.on('end', function ()
client.end();
context.done();
)
.on('error', function(error)
console.log(error);
);
);
;
【讨论】:
顺便说一句,你应该使用 end 而不是 finish:nodejs.org/api/stream.html#stream_events_finish_and_end【参考方案2】:context.done()
立即停止您的 Lambda 函数的执行。因此,当您调用 next(null, null)
时,瀑布流完成,并且因为您的 Postgres 查询是异步的,所以它不会运行到完成。
试试吧:
stream.pipe(query)
.on('end', function()
client.end();
next(null, null);
)
.on('error', context.fail);
请注意这里的方法,我们仅在流结束后才解决瀑布。
【讨论】:
谢谢,这是有道理的。现在看来,流不会终止,因此永远不会到达on('end'...)
部分。由于超时,该函数最终被杀死。文件大小为 521 字节。更令人困惑的是,在 lambda 执行期间,我没有在 postgres 中看到使用 select * from pg_stat_activity;
的连接。在本地运行时,相同的代码显示了执行期间的连接(我还在本地和 AWS 上通过注释掉 client.end()
部分进行了测试,因此连接仍然有效)。
嗯,即使最大超时为 60 秒?您是否已将数据库配置为侦听 TCP/IP 上的连接?
是的,数据库通过 TCP/IP 侦听连接,并且该用户被授权从任何 IP 进行连接。当我在本地测试时,数据库仍然是远程的(凭证、表、调用,都是一样的)并且它工作。是的,我的 lambda 使用最大超时。以上是关于使用 Node.js 和 AWS Lambda 将 S3 文件的内容记录到 postgres 表的主要内容,如果未能解决你的问题,请参考以下文章
在异步 AWS Lambda 函数中使用带有 node-fetch 模块的 node.js 时遇到问题
错误:“无法从上下文中获取当前子/段”在 Lambda 中使用 AWS X-ray 和 node.js
Node.js中的代码AWS Lambda Package不会调用putRecord()来将数据添加到AWS Kinesis Firehose Stream中
删除文件路径和文件名,使用 Node 将文件名留在 AWS Lambda 中