S3 事件触发器是不是可扩展?

Posted

技术标签:

【中文标题】S3 事件触发器是不是可扩展?【英文标题】:Is S3 Event Trigger Scalable?S3 事件触发器是否可扩展? 【发布时间】:2018-04-19 04:03:29 【问题描述】:

将大约 3K 对象(文件)加载到 S3。加载到该 S3 存储桶的每个文件都有一个事件触发器。

Lambda 仅接收大约 300 个对象的事件触发器。如果我重试(从 S3 移回并放回 S3),它会为另外 400 个对象生成事件,其余事件甚至没有达到 lambda。

我在这里遗漏了什么,如何针对创建的任意数量的对象进行缩放?

var async = require('async');                                                                                                                                                                                
var aws = require('aws-sdk');                                                                                                                                                                                
var s3 = new aws.S3();                                                                                                                                                                                       
var kinesis = new aws.Kinesis();                                                                                                                                                                             
var sns = new aws.SNS();                                                                                                                                                                                     
var config = require('./config.js');                                                                                                                                                                         


var logError = function(errormsg)                                                                                                                                                                           
    sns.publish(                                                                                                                                                                                            
        TopicArn: config.TopicArn,                                                                                                                                                                           
        Message: errormsg                                                                                                                                                                                    
    , function(err, data)                                                                                                                                                                                  
        if (err)                                                                                                                                                                                            
            console.log(errormsg);                                                                                                                                                                           
                                                                                                                                                                                                            
    );                                                                                                                                                                                                      
;                                                                                                                                                                                                           


exports.handler = function(event, context, callback)                                                                                                                                                        

    var readS3andSendtoKinesis = function(record, index, cb)                                                                                                                                                
        var params =                                                                                                                                                                                        
            Bucket: record.s3.bucket.name,                                                                                                                                                                   
            Key: record.s3.object.key                                                                                                                                                                        
        ; 
        console.log('Received File: ' +  record.s3.object.key);                                                                                                                                                                                                 
        s3.getObject(params, function(err, data)                                                                                                                                                            
            if (!err)                                                                                                                                                                                       
                var kinesisParams =                                                                                                                                                                         
                    Data: data.Body.toString('utf8'),                                                                                                                                                        
                    PartitionKey: config.PartitionKey,                                                                                                                                                       
                    StreamName: config.StreamName                                                                                                                                                            
                ;                                                                                                                                                                                           
                kinesis.putRecord(kinesisParams, function(err, data)                                                                                                                                        
                    if (err)                                                                                                                                                                                
                        // Handle Kinesis Failures                                                                                                                                                           
                        logError(JSON.stringify(err, null, 2));                                                                                                                                              
                                                                                                                                                                                                            
                    cb(null, 'done');                                                                                                                                                                        
                );                                                                                                                                                                                          
             else                                                                                                                                                                                          
                // Handle S3 Failures                                                                                                                                                                        
                logError(JSON.stringify(err, null, 2));                                                                                                                                                      
                cb(null, 'done');                                                                                                                                                                            
                                                                                                                                                                                                            
        );                                                                                                                                                                                                  
    ;                                                                                                                                                                                                       

    async.eachOfLimit(event.Records, 1, readS3andSendtoKinesis, function(err)                                                                                                                               
        callback(null, 'Done');                                                                                                                                                                              
    );                                                                                                                                                                                                      
; 

由于每个人都建议查看 cloudwatch,因此在此处分享相关 lambda 的 cloudwatch 指标,

【问题讨论】:

【参考方案1】:

AWS Lambda 具有节流配置,可避免出现失控情况。

对于 S3,Lambda 调用也依赖于 permissions,因此您应该检查这些权限。

由于 S3 不是基于流的源,因此您可能会看到同步场景,其中 throttling 达到限制并且 S3 没有重试。检查 lambdas 中的限制和错误 429。

【讨论】:

【参考方案2】:

我们发现根本原因似乎在资源的另一端失败。 S3 触发器发生并且无法扩展到它接收到的巨大触发器。

解决,

尽快返回 S3 Lambda 触发器,延迟会导致问题。

如果您花太多时间处理触发器内部的业务逻辑,在我们的例子中,我们是从 S3 读取并写入流。相反,我们只写了 S3 的位置,然后在接收端从 S3 读取。

希望对你有帮助。

【讨论】:

以上是关于S3 事件触发器是不是可扩展?的主要内容,如果未能解决你的问题,请参考以下文章

PrimeFaces 可扩展和可选行

React Flux 调度程序与 Node.js EventEmitter - 可扩展?

由 Chrome 扩展触发的 DOM 事件是不是在主页的上下文中传播?

系统可扩展设计方法之消息队列

如何清除 ExpandableListView?

RxJava入门