使用 Node.js 和 AWS Lambda 将 S3 文件的内容记录到 postgres 表

Posted

技术标签:

【中文标题】使用 Node.js 和 AWS Lambda 将 S3 文件的内容记录到 postgres 表【英文标题】:Logging contents of S3 file to postgres table with Node.js and AWS Lambda 【发布时间】:2015-06-16 19:01:47 【问题描述】:

我正在 node.js 中编写一个 AWS Lambda 执行脚本,它响应 S3 put 事件(日志文件),读取文件并通过 COPY 命令插入 Postgres 表。似乎除了写入数据库(下面脚本中的logToPostgres)之外的所有内容都按预期工作。

一些注意事项:

我已经删除了它检查以验证它是 S3 put 事件的部分,以及其他错误处理代码,因为它与这里无关。 数据库用户对表拥有INSERT 权限,对数据库拥有ALL 权限,可以从任何IP 访问(均已验证)。 secrets.js 是同一目录中的一个模块,用于导出数据库凭据 在本地运行脚本时,我可以正常写入数据库。 我没有达到任何 AWS Lambda 的限制 — 从 S3 下载的文件为 521 字节,超时设置为最长 60 秒(在测试和写入同一个数据库时,它在 300 毫秒内运行)。

cloud watch 中没有错误,通过在每一步添加日志记录,我能够将其缩小到代码的 stream.pipe(query)... 部分。由于某种原因,这部分没有被 AWS Lambda 执行,但它在本地运行良好。它没有发出'finished''end' 事件,所以我猜它仍然未执行。

对问题可能出在哪里有任何想法?

var async = require('async');
var fs = require('fs');
var aws = require('aws-sdk');
var s3 = new aws.S3();
var pg = require('pg');
var pgCopy = require('pg-copy-streams').from;
var secrets = require('./secrets.js');

exports.handler = function(event, context) 
    var bucket = event.Records[0].s3.bucket.name;
    var key = event.Records[0].s3.object.key;

    async.waterfall(
        [
            function downloadWebhook(next) 
                s3.getObject(Bucket: bucket, Key: key, next);
            ,
            function saveToDisk(response, next) 
                var file = fs.createWriteStream('/tmp/foo_' + Date.now());
                file.write(response.Body);
                file.close();
                next(null, file.path);
            ,
            function createStdinStream(path, next) 
                next(null, fs.createReadStream(path));
            ,
            function logToPostgres(stream, next) 
                var client = new pg.Client('pg://' + secrets.user + ':' +
                    secrets.password + '@' + secrets.host + ':' +
                    secrets.port + '/' + secrets.database);
                client.connect(function (error) 
                    if (error) console.error(error);
                    var query = client.query(pgCopy('COPY my_table FROM STDIN'));
                    stream.pipe(query)
                        .on('finish', function () 
                            client.end();
                            next(null, null);
                        );
                );
            
        ],
        function (error) 
            if (error) console.error(error);
            context.done();
        
    );
;

更新:

事实证明,可写流现在会发出 'finish' 事件,因此将其更改为 'finish' 并包含答案中的建议使其运行时不会出错。但是,在 lambda 运行后,数据库中仍然没有一行。我怀疑事务正在回滚,但无法查明原因或位置。我什至尝试过明确开始并提交事务,但没有骰子。

【问题讨论】:

检查Lambda Limits 以确保您没有达到这些目标,并将query.on('error', function(error) ...); error handler 添加到您的logToPostgres 方法中,因为它可能会在那里崩溃。 @TristanFoureur 我没有达到任何这些限制。文件大小为 520 字节,在本地机器上,一切都在 200 毫秒内完成(我的函数中的 lambda 超时设置为 60 秒)。我确实在管道之后有on('error'... ) 用于query,但是该事件要么没有被触发(因为没有执行任何操作),要么没有错误。请注意,AWS lambda 完全运行干净,没有报告错误。数据库表中没有任何内容。 @TristanFoureur 请参阅下面我对 William Gaul 建议的回复。 【参考方案1】:

不确定您的解决方案出了什么问题。这是我经过测试且更紧凑的解决方案。请让我知道这是否有效!

var aws = require('aws-sdk');
var s3 = new aws.S3();
var S3S = require('s3-streams');
var pg = require('pg');
var pgCopy = require('pg-copy-streams').from;
var secrets = require('./secrets.js');

exports.handler = function(event, context) 
    var bucket = event.Records[0].s3.bucket.name;
    var key = event.Records[0].s3.object.key;

    var stream = S3S.ReadStream(s3, Bucket: bucket, Key: key);
    pg.connect(secrets.connector, function(err, client) 
        if (err) console.log(err);
        var query = client.query(pgCopy(
            "COPY event_log(user_id, event, ...) FROM STDIN CSV"
        ));
        stream.pipe(query)
            .on('end', function () 
                client.end();
                context.done();
            )
            .on('error', function(error) 
                console.log(error);
            );
    );
;

【讨论】:

顺便说一句,你应该使用 end 而不是 finish:nodejs.org/api/stream.html#stream_events_finish_and_end【参考方案2】:

context.done() 立即停止您的 Lambda 函数的执行。因此,当您调用 next(null, null) 时,瀑布流完成,并且因为您的 Postgres 查询是异步的,所以它不会运行到完成。

试试吧:

stream.pipe(query)
    .on('end', function() 
        client.end();
        next(null, null);
    )
    .on('error', context.fail);

请注意这里的方法,我们仅在流结束后才解决瀑布。

【讨论】:

谢谢,这是有道理的。现在看来,流不会终止,因此永远不会到达 on('end'...) 部分。由于超时,该函数最终被杀死。文件大小为 521 字节。更令人困惑的是,在 lambda 执行期间,我没有在 postgres 中看到使用 select * from pg_stat_activity; 的连接。在本地运行时,相同的代码显示了执行期间的连接(我还在本地和 AWS 上通过注释掉 client.end() 部分进行了测试,因此连接仍然有效)。 嗯,即使最大超时为 60 秒?您是否已将数据库配置为侦听 TCP/IP 上的连接? 是的,数据库通过 TCP/IP 侦听连接,并且该用户被授权从任何 IP 进行连接。当我在本地测试时,数据库仍然是远程的(凭证、表、调用,都是一样的)并且它工作。是的,我的 lambda 使用最大超时。

以上是关于使用 Node.js 和 AWS Lambda 将 S3 文件的内容记录到 postgres 表的主要内容,如果未能解决你的问题,请参考以下文章

在异步 AWS Lambda 函数中使用带有 node-fetch 模块的 node.js 时遇到问题

错误:“无法从上下文中获取当前子/段”在 Lambda 中使用 AWS X-ray 和 node.js

Node.js中的代码AWS Lambda Package不会调用putRecord()来将数据添加到AWS Kinesis Firehose Stream中

删除文件路径和文件名,使用 Node 将文件名留在 AWS Lambda 中

使用 Node.JS 调用 AWS 胶水的 lambda 函数不使用 console.log 的原因是啥?

如何从 AWS lambda 发布到 Node.js 中的云观察指标