在现有 S3 对象上运行 S3-put 触发的 Lambda 函数?

Posted

技术标签:

【中文标题】在现有 S3 对象上运行 S3-put 触发的 Lambda 函数?【英文标题】:Running S3-put-triggered Lambda function on existing S3 objects? 【发布时间】:2017-02-21 13:32:06 【问题描述】:

我在 Node.js 中有一个 Lambda 函数,用于处理添加到我的存储桶中的新图像。我想为所有现有对象运行该函数。我怎样才能做到这一点?我认为最简单的方法是“重新放置”每个对象,以触发该功能,但我不知道该怎么做。

说清楚 - 我想在每个 现有 对象上运行一次。触发器已经为 new 对象工作,我只需要在创建 lambda 函数之前 插入的对象上运行它。

【问题讨论】:

请让我正确理解您的要求,您需要的正是每次将图像添加到存储桶时,您都想重新处理所有现有图像?或者您想对每张图像处理多少次? @TachúSalamanca 更新了 OP 【参考方案1】:

以下 Lambda 函数将满足您的要求。

它将遍历目标 S3 存储桶中的每个文件,并对每个文件执行所需的 lambda 函数,以模拟 put 操作。

你可能会想要为这个函数设置一个很长的执行时间限制

var TARGET_BUCKET="my-bucket-goes-here";
var TARGET_LAMBDA_FUNCTION_NAME="TestFunct";
var S3_PUT_SIMULATION_PARAMS=
  "Records": [
    
      "eventVersion": "2.0",
      "eventTime": "1970-01-01T00:00:00.000Z",
      "requestParameters": 
        "sourceIPAddress": "127.0.0.1"
      ,
      "s3": 
        "configurationId": "testConfigRule",
        "object": 
          "eTag": "0123456789abcdef0123456789abcdef",
          "sequencer": "0A1B2C3D4E5F678901",
          "key": "HappyFace.jpg",
          "size": 1024
        ,
        "bucket": 
          "arn": "arn:aws:s3:::mybucket",
          "name": "sourcebucket",
          "ownerIdentity": 
            "principalId": "EXAMPLE"
          
        ,
        "s3SchemaVersion": "1.0"
      ,
      "responseElements": 
        "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH",
        "x-amz-request-id": "EXAMPLE123456789"
      ,
      "awsRegion": "us-east-1",
      "eventName": "ObjectCreated:Put",
      "userIdentity": 
        "principalId": "EXAMPLE"
      ,
      "eventSource": "aws:s3"
    
  ]
;

var aws = require('aws-sdk');
var s3 = new aws.S3();
var lambda = new aws.Lambda();


exports.handler = (event, context, callback) => 
    retrieveS3BucketContents(TARGET_BUCKET, function(s3Objects)
        simulateS3PutOperation(TARGET_BUCKET, s3Objects, simulateS3PutOperation, function() 
            console.log("complete."); 
        );
    );
;

function retrieveS3BucketContents(bucket, callback)
    s3.listObjectsV2(
        Bucket: TARGET_BUCKET
    , function(err, data) 
        callback(data.Contents);
    );


function simulateS3PutOperation(bucket, s3ObjectStack, callback, callbackEmpty)
    var params = 
      FunctionName: TARGET_LAMBDA_FUNCTION_NAME, 
      Payload: ""
    ;

    if(s3ObjectStack.length > 0)
        var s3Obj = s3ObjectStack.pop();
        var p = S3_PUT_SIMULATION_PARAMS;
        p.Records[0].s3.bucket.name = bucket;
        p.Records[0].s3.object.key = s3Obj.Key;
        params.Payload = JSON.stringify(p, null, 2);
        lambda.invoke(params, function(err, data) 
          if (err) console.log(err, err.stack); // an error occurred
          else
              callback(bucket, s3ObjectStack, callback, callbackEmpty);
          
        );
    
    else
        callbackEmpty();   
    

以下是执行此方法所需的 lambda 查询的完整策略,它允许 R/W 对 CloudWatch 日志和 ListObject 对 S3 的访问。您需要在您看到 MY-BUCKET-GOES-HERE 的地方填写您的存储桶详细信息


    "Version": "2012-10-17",
    "Statement": [
        
            "Sid": "Stmt1477382207000",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::MY-BUCKET-GOES-HERE/*"
            ]
        ,
        
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    
    ]

【讨论】:

正是我的想法,谢谢!附言。您的网站已关闭。 感谢罗斯柴尔德!我不知道 :) 干杯! 嘿 Xavier,我的访问被拒绝了,没有进一步的解释。我需要什么样的角色政策?我是一个 heroku 人,所以很抱歉,但是 lambda 太好了,不能放弃。 我有一个 lambda 基本执行策略,它摆脱了 lambda 执行权限错误,但现在我只是被拒绝访问。更改了 JSON 中的存储桶、函数名称和区域。 您需要将 s3 的读取权限授予 lambda 正在执行的 IAM 角色【参考方案2】:

这个线程帮助我朝着正确的方向前进,因为我需要为两个存储桶中现有的 50k 个文件为每个文件调用一个 lambda 函数。我决定用 python 编写它并将同时运行的 lambda 函数的数量限制为 500(许多 aws 区域的并发限制为 1000)。

该脚本创建了一个由 500 个线程组成的工作池,这些线程从存储桶键队列中获取信息。每个工人在拿起另一个之前等待他们的 lambda 完成。由于对我的 50k 文件执行此脚本需要几个小时,所以我只是在本地计算机上运行它。希望这对某人有帮助!

#!/usr/bin/env python

# Proper imports
import json
import time
import base64
from queue import Queue
from threading import Thread
from argh import dispatch_command

import boto3
from boto.s3.connection import S3Connection

client = boto3.client('lambda')

def invoke_lambdas():
    try:
        # replace these with your access keys
        s3 = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
        buckets = [s3.get_bucket('bucket-one'), s3.get_bucket('bucket-two')]

        queue = Queue()
        num_threads = 500

        # create a worker pool
        for i in range(num_threads):
            worker = Thread(target=invoke, args=(queue,))
            worker.setDaemon(True)
            worker.start()

        for bucket in buckets:
            for key in bucket.list():
                queue.put((bucket.name, key.key))

        queue.join()

    except Exception as e:
        print(e)

def invoke(queue):
    while True:
        bucket_key = queue.get()

        try:
            print('Invoking lambda with bucket %s key %s. Remaining to process: %d'
                % (bucket_key[0], bucket_key[1], queue.qsize()))
            trigger_event = 
                'Records': [
                    's3': 
                        'bucket': 
                            'name': bucket_key[0]
                        ,
                        'object': 
                            'key': bucket_key[1]
                        
                    
                ]
            

            # replace lambda_function_name with the actual name
            # InvocationType='RequestResponse' means it will wait until the lambda fn is complete
            response = client.invoke(
                FunctionName='lambda_function_name',
                InvocationType='RequestResponse',
                LogType='None',
                ClientContext=base64.b64encode(json.dumps().encode()).decode(),
                Payload=json.dumps(trigger_event).encode()
            )
            if response['StatusCode'] != 200:
                print(response)

        except Exception as e:
            print(e)
            print('Exception during invoke_lambda')

        queue.task_done()

if __name__ == '__main__':
    dispatch_command(invoke_lambdas)

【讨论】:

【参考方案3】:

因为我必须在 非常大 存储桶上执行此操作,并且 lambda 函数有一个最大值。 10 分钟的执行时间,我最终用 Ruby AWS-SDK 编写了一个脚本。

require 'aws-sdk-v1'

class LambdaS3Invoker

  BUCKET_NAME = "HERE_YOUR_BUCKET"
  FUNCTION_NAME = "HERE_YOUR_FUNCTION_NAME"
  AWS_KEY = "HERE_YOUR_AWS_KEY"
  AWS_SECRET = "HERE_YOUR_AWS_SECRET"
  REGION = "HERE_YOUR_REGION"

  def execute
    bucket.objects( prefix: 'products').each do |o|
      lambda_invoke(o.key)
    end
  end

  private

  def lambda_invoke(key)
    lambda.invoke(
      function_name: FUNCTION_NAME,
      invocation_type: 'Event',
      payload: JSON.generate(
        Records: [
          s3: 
            object: 
              key: key,
            ,
            bucket: 
              name: BUCKET_NAME,
            
          
        ]
      )
    )
  end

  def lambda
    @lambda ||= Aws::Lambda::Client.new(
      region: REGION,
      access_key_id: AWS_KEY,
      secret_access_key: AWS_SECRET
    )
  end

  def resource
    @resource ||= Aws::S3::Resource.new(
      access_key_id: AWS_KEY,
      secret_access_key: AWS_SECRET
    )
  end

  def bucket
    @bucket ||= resource.bucket(BUCKET_NAME)
  end
end

然后你可以这样称呼它:

LambdaS3Invoker.new.execute

【讨论】:

【参考方案4】:

您需要做的是创建一个一次性脚本,该脚本使用 AWS 开发工具包来调用您的 lambda 函数。此解决方案不需要您“重新放置”对象。

我的答案将基于 AWS JS SDK。

明确一点 - 我想一次性运行每个现有的 对象。触发器已经为新对象工作了,我只需要 在 lambda 函数之前插入的对象上运行它 已创建。

由于您有一个接受 S3 put 事件的有效 lambda 函数,您需要做的是在 S3 中找到所有未处理的对象(如果您有每个 S3 对象的 DB 条目,如果没有,上面应该很容易,那么您可能会找到S3列表对象函数好用http://docs.aws.amazon.com/AWSjavascriptSDK/latest/AWS/S3.html#listObjectsV2-property)。

然后为获得的每个未处理的 S3 对象创建一个类似于 S3 Put Event Message(如下所示)的 JSON 对象,并使用上述 JSON 对象作为负载调用 Lambda 调用函数。

您可以在 http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html#invoke-property 找到 lambda 调用函数文档

为您的 lambda 函数创建虚假 S3 Put 事件消息对象时,您可以忽略大部分实际对象属性取决于您的 lambda 函数。我想您至少需要设置存储桶名称和对象键。

S3 Put 事件消息结构http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html

  
   "Records":[  
        
         "eventVersion":"2.0",
         "eventSource":"aws:s3",
         "awsRegion":"us-east-1",
         "eventTime":"1970-01-01T00:00:00.000Z",
         "eventName":"ObjectCreated:Put",
         "userIdentity":  
            "principalId":"AIDAJDPLRKLG7UEXAMPLE"
         ,
         "requestParameters":  
            "sourceIPAddress":"127.0.0.1"
         ,
         "responseElements":  
            "x-amz-request-id":"C3D13FE58DE4C810",
            "x-amz-id-2":"FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD"
         ,
         "s3":  
            "s3SchemaVersion":"1.0",
            "configurationId":"testConfigRule",
            "bucket":  
               "name":"mybucket",
               "ownerIdentity":  
                  "principalId":"A3NL1KOZZKExample"
               ,
               "arn":"arn:aws:s3:::mybucket"
            ,
            "object":  
               "key":"HappyFace.jpg",
               "size":1024,
               "eTag":"d41d8cd98f00b204e9800998ecf8427e",
               "versionId":"096fKKXTRTtl3on89fVO.nfljtsv6qko",
               "sequencer":"0055AED6DCD90281E5"
            
         
      
   ]

【讨论】:

【参考方案5】:

基本上你需要的是使用一些 api 调用(例如,如果你使用 python)并列出所有新对象或 s3 存储桶中的所有对象,然后处理这些对象

这是一个sn-p:

from boto.s3.connection import S3Connection

conn = S3Connection()
source = conn.get_bucket(src_bucket)
src_list = set([key.name for key in source.get_all_keys(headers=None, prefix=prefix)])
//and then you can go over this src list
for entry in src_list:
   do something

【讨论】:

我不确定这是否准确地回答了这个问题。我坚持的一点是在对象上运行 Lambda 函数。它似乎只使用触发器运行,即插入对象。是否可以获取所有对象,然后在每个对象上调用 lambda 函数?顺便说一句,我正在使用 Node。 为了澄清,该函数将图像转换为各种尺寸。在我实现该功能之前,存储桶中已经有照片,我需要在这些对象上运行它。 所以你想触发插入对象的功能,然后在桶中的所有对象上运行它? 目前我设置了一个 S3 触发器,它适用于新对象,因为它是由 put 调用触发的。对于新对象来说一切都很好。但是,已经在其中的对象显然不会触发 put,因为它们 已经 在存储桶中。我想在这些对象上运行该函数/触发它。在我看来,最干净的方法似乎是重新插入具有完全相同名称的对象,或“reput”它们,以便触发,但我相对初级,我不确定这是否不必要。 您可以通过两种方式执行此操作,此处触发一个检查未转换图像的功能,然后将这些图像添加到将转换图片的列表/队列中。或调用 lambda 测试并配置一个测试事件以进行手动调用,这将强制手动转换

以上是关于在现有 S3 对象上运行 S3-put 触发的 Lambda 函数?的主要内容,如果未能解决你的问题,请参考以下文章

将对象上传到 S3 存储桶时如何触发 AWS Cloudformation 堆栈的更新?

如何使用Coldfusion fileExist检查Amazon S3上是否存在文件?

Rails ActiveStorage 附加到现有 S3 文件

Amazon S3 在另一个账户中触发另一个 Lambda 函数

使用 cloudformation yaml 在 Lambda 函数上添加 S3 触发器

使用 AWS Java 开发工具包为现有 S3 对象设置 Expires 标头