在现有 S3 对象上运行 S3-put 触发的 Lambda 函数?
Posted
技术标签:
【中文标题】在现有 S3 对象上运行 S3-put 触发的 Lambda 函数?【英文标题】:Running S3-put-triggered Lambda function on existing S3 objects? 【发布时间】:2017-02-21 13:32:06 【问题描述】:我在 Node.js 中有一个 Lambda 函数,用于处理添加到我的存储桶中的新图像。我想为所有现有对象运行该函数。我怎样才能做到这一点?我认为最简单的方法是“重新放置”每个对象,以触发该功能,但我不知道该怎么做。
说清楚 - 我想在每个 现有 对象上运行一次。触发器已经为 new 对象工作,我只需要在创建 lambda 函数之前 插入的对象上运行它。
【问题讨论】:
请让我正确理解您的要求,您需要的正是每次将图像添加到存储桶时,您都想重新处理所有现有图像?或者您想对每张图像处理多少次? @TachúSalamanca 更新了 OP 【参考方案1】:以下 Lambda 函数将满足您的要求。
它将遍历目标 S3 存储桶中的每个文件,并对每个文件执行所需的 lambda 函数,以模拟 put 操作。
你可能会想要为这个函数设置一个很长的执行时间限制
var TARGET_BUCKET="my-bucket-goes-here";
var TARGET_LAMBDA_FUNCTION_NAME="TestFunct";
var S3_PUT_SIMULATION_PARAMS=
"Records": [
"eventVersion": "2.0",
"eventTime": "1970-01-01T00:00:00.000Z",
"requestParameters":
"sourceIPAddress": "127.0.0.1"
,
"s3":
"configurationId": "testConfigRule",
"object":
"eTag": "0123456789abcdef0123456789abcdef",
"sequencer": "0A1B2C3D4E5F678901",
"key": "HappyFace.jpg",
"size": 1024
,
"bucket":
"arn": "arn:aws:s3:::mybucket",
"name": "sourcebucket",
"ownerIdentity":
"principalId": "EXAMPLE"
,
"s3SchemaVersion": "1.0"
,
"responseElements":
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH",
"x-amz-request-id": "EXAMPLE123456789"
,
"awsRegion": "us-east-1",
"eventName": "ObjectCreated:Put",
"userIdentity":
"principalId": "EXAMPLE"
,
"eventSource": "aws:s3"
]
;
var aws = require('aws-sdk');
var s3 = new aws.S3();
var lambda = new aws.Lambda();
exports.handler = (event, context, callback) =>
retrieveS3BucketContents(TARGET_BUCKET, function(s3Objects)
simulateS3PutOperation(TARGET_BUCKET, s3Objects, simulateS3PutOperation, function()
console.log("complete.");
);
);
;
function retrieveS3BucketContents(bucket, callback)
s3.listObjectsV2(
Bucket: TARGET_BUCKET
, function(err, data)
callback(data.Contents);
);
function simulateS3PutOperation(bucket, s3ObjectStack, callback, callbackEmpty)
var params =
FunctionName: TARGET_LAMBDA_FUNCTION_NAME,
Payload: ""
;
if(s3ObjectStack.length > 0)
var s3Obj = s3ObjectStack.pop();
var p = S3_PUT_SIMULATION_PARAMS;
p.Records[0].s3.bucket.name = bucket;
p.Records[0].s3.object.key = s3Obj.Key;
params.Payload = JSON.stringify(p, null, 2);
lambda.invoke(params, function(err, data)
if (err) console.log(err, err.stack); // an error occurred
else
callback(bucket, s3ObjectStack, callback, callbackEmpty);
);
else
callbackEmpty();
以下是执行此方法所需的 lambda 查询的完整策略,它允许 R/W 对 CloudWatch 日志和 ListObject 对 S3 的访问。您需要在您看到 MY-BUCKET-GOES-HERE 的地方填写您的存储桶详细信息
"Version": "2012-10-17",
"Statement": [
"Sid": "Stmt1477382207000",
"Effect": "Allow",
"Action": [
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::MY-BUCKET-GOES-HERE/*"
]
,
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*"
]
【讨论】:
正是我的想法,谢谢!附言。您的网站已关闭。 感谢罗斯柴尔德!我不知道 :) 干杯! 嘿 Xavier,我的访问被拒绝了,没有进一步的解释。我需要什么样的角色政策?我是一个 heroku 人,所以很抱歉,但是 lambda 太好了,不能放弃。 我有一个 lambda 基本执行策略,它摆脱了 lambda 执行权限错误,但现在我只是被拒绝访问。更改了 JSON 中的存储桶、函数名称和区域。 您需要将 s3 的读取权限授予 lambda 正在执行的 IAM 角色【参考方案2】:这个线程帮助我朝着正确的方向前进,因为我需要为两个存储桶中现有的 50k 个文件为每个文件调用一个 lambda 函数。我决定用 python 编写它并将同时运行的 lambda 函数的数量限制为 500(许多 aws 区域的并发限制为 1000)。
该脚本创建了一个由 500 个线程组成的工作池,这些线程从存储桶键队列中获取信息。每个工人在拿起另一个之前等待他们的 lambda 完成。由于对我的 50k 文件执行此脚本需要几个小时,所以我只是在本地计算机上运行它。希望这对某人有帮助!
#!/usr/bin/env python
# Proper imports
import json
import time
import base64
from queue import Queue
from threading import Thread
from argh import dispatch_command
import boto3
from boto.s3.connection import S3Connection
client = boto3.client('lambda')
def invoke_lambdas():
try:
# replace these with your access keys
s3 = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
buckets = [s3.get_bucket('bucket-one'), s3.get_bucket('bucket-two')]
queue = Queue()
num_threads = 500
# create a worker pool
for i in range(num_threads):
worker = Thread(target=invoke, args=(queue,))
worker.setDaemon(True)
worker.start()
for bucket in buckets:
for key in bucket.list():
queue.put((bucket.name, key.key))
queue.join()
except Exception as e:
print(e)
def invoke(queue):
while True:
bucket_key = queue.get()
try:
print('Invoking lambda with bucket %s key %s. Remaining to process: %d'
% (bucket_key[0], bucket_key[1], queue.qsize()))
trigger_event =
'Records': [
's3':
'bucket':
'name': bucket_key[0]
,
'object':
'key': bucket_key[1]
]
# replace lambda_function_name with the actual name
# InvocationType='RequestResponse' means it will wait until the lambda fn is complete
response = client.invoke(
FunctionName='lambda_function_name',
InvocationType='RequestResponse',
LogType='None',
ClientContext=base64.b64encode(json.dumps().encode()).decode(),
Payload=json.dumps(trigger_event).encode()
)
if response['StatusCode'] != 200:
print(response)
except Exception as e:
print(e)
print('Exception during invoke_lambda')
queue.task_done()
if __name__ == '__main__':
dispatch_command(invoke_lambdas)
【讨论】:
【参考方案3】:因为我必须在 非常大 存储桶上执行此操作,并且 lambda 函数有一个最大值。 10 分钟的执行时间,我最终用 Ruby AWS-SDK 编写了一个脚本。
require 'aws-sdk-v1'
class LambdaS3Invoker
BUCKET_NAME = "HERE_YOUR_BUCKET"
FUNCTION_NAME = "HERE_YOUR_FUNCTION_NAME"
AWS_KEY = "HERE_YOUR_AWS_KEY"
AWS_SECRET = "HERE_YOUR_AWS_SECRET"
REGION = "HERE_YOUR_REGION"
def execute
bucket.objects( prefix: 'products').each do |o|
lambda_invoke(o.key)
end
end
private
def lambda_invoke(key)
lambda.invoke(
function_name: FUNCTION_NAME,
invocation_type: 'Event',
payload: JSON.generate(
Records: [
s3:
object:
key: key,
,
bucket:
name: BUCKET_NAME,
]
)
)
end
def lambda
@lambda ||= Aws::Lambda::Client.new(
region: REGION,
access_key_id: AWS_KEY,
secret_access_key: AWS_SECRET
)
end
def resource
@resource ||= Aws::S3::Resource.new(
access_key_id: AWS_KEY,
secret_access_key: AWS_SECRET
)
end
def bucket
@bucket ||= resource.bucket(BUCKET_NAME)
end
end
然后你可以这样称呼它:
LambdaS3Invoker.new.execute
【讨论】:
【参考方案4】:您需要做的是创建一个一次性脚本,该脚本使用 AWS 开发工具包来调用您的 lambda 函数。此解决方案不需要您“重新放置”对象。
我的答案将基于 AWS JS SDK。
明确一点 - 我想一次性运行每个现有的 对象。触发器已经为新对象工作了,我只需要 在 lambda 函数之前插入的对象上运行它 已创建。
由于您有一个接受 S3 put 事件的有效 lambda 函数,您需要做的是在 S3 中找到所有未处理的对象(如果您有每个 S3 对象的 DB 条目,如果没有,上面应该很容易,那么您可能会找到S3列表对象函数好用http://docs.aws.amazon.com/AWSjavascriptSDK/latest/AWS/S3.html#listObjectsV2-property)。
然后为获得的每个未处理的 S3 对象创建一个类似于 S3 Put Event Message(如下所示)的 JSON 对象,并使用上述 JSON 对象作为负载调用 Lambda 调用函数。
您可以在 http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html#invoke-property 找到 lambda 调用函数文档
为您的 lambda 函数创建虚假 S3 Put 事件消息对象时,您可以忽略大部分实际对象属性取决于您的 lambda 函数。我想您至少需要设置存储桶名称和对象键。
S3 Put 事件消息结构http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
"Records":[
"eventVersion":"2.0",
"eventSource":"aws:s3",
"awsRegion":"us-east-1",
"eventTime":"1970-01-01T00:00:00.000Z",
"eventName":"ObjectCreated:Put",
"userIdentity":
"principalId":"AIDAJDPLRKLG7UEXAMPLE"
,
"requestParameters":
"sourceIPAddress":"127.0.0.1"
,
"responseElements":
"x-amz-request-id":"C3D13FE58DE4C810",
"x-amz-id-2":"FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD"
,
"s3":
"s3SchemaVersion":"1.0",
"configurationId":"testConfigRule",
"bucket":
"name":"mybucket",
"ownerIdentity":
"principalId":"A3NL1KOZZKExample"
,
"arn":"arn:aws:s3:::mybucket"
,
"object":
"key":"HappyFace.jpg",
"size":1024,
"eTag":"d41d8cd98f00b204e9800998ecf8427e",
"versionId":"096fKKXTRTtl3on89fVO.nfljtsv6qko",
"sequencer":"0055AED6DCD90281E5"
]
【讨论】:
【参考方案5】:基本上你需要的是使用一些 api 调用(例如,如果你使用 python)并列出所有新对象或 s3 存储桶中的所有对象,然后处理这些对象
这是一个sn-p:
from boto.s3.connection import S3Connection
conn = S3Connection()
source = conn.get_bucket(src_bucket)
src_list = set([key.name for key in source.get_all_keys(headers=None, prefix=prefix)])
//and then you can go over this src list
for entry in src_list:
do something
【讨论】:
我不确定这是否准确地回答了这个问题。我坚持的一点是在对象上运行 Lambda 函数。它似乎只使用触发器运行,即插入对象。是否可以获取所有对象,然后在每个对象上调用 lambda 函数?顺便说一句,我正在使用 Node。 为了澄清,该函数将图像转换为各种尺寸。在我实现该功能之前,存储桶中已经有照片,我需要在这些对象上运行它。 所以你想触发插入对象的功能,然后在桶中的所有对象上运行它? 目前我设置了一个 S3 触发器,它适用于新对象,因为它是由 put 调用触发的。对于新对象来说一切都很好。但是,已经在其中的对象显然不会触发 put,因为它们 已经 在存储桶中。我想在这些对象上运行该函数/触发它。在我看来,最干净的方法似乎是重新插入具有完全相同名称的对象,或“reput”它们,以便触发,但我相对初级,我不确定这是否不必要。 您可以通过两种方式执行此操作,此处触发一个检查未转换图像的功能,然后将这些图像添加到将转换图片的列表/队列中。或调用 lambda 测试并配置一个测试事件以进行手动调用,这将强制手动转换以上是关于在现有 S3 对象上运行 S3-put 触发的 Lambda 函数?的主要内容,如果未能解决你的问题,请参考以下文章
将对象上传到 S3 存储桶时如何触发 AWS Cloudformation 堆栈的更新?
如何使用Coldfusion fileExist检查Amazon S3上是否存在文件?
Rails ActiveStorage 附加到现有 S3 文件
Amazon S3 在另一个账户中触发另一个 Lambda 函数