如何每次将新上传的文件从 s3 下载到 ec2

Posted

技术标签:

【中文标题】如何每次将新上传的文件从 s3 下载到 ec2【英文标题】:How to download new uploaded files from s3 to ec2 everytime 【发布时间】:2022-01-07 23:26:08 【问题描述】:

我有一个s3 存储桶,它将全天接收新文件。每次将新文件上传到存储桶时,我都想将这些下载到我的 ec2 实例。

我已经读到它可以使用sqssnslambda。哪一个是最简单的?文件上传到存储桶后,我需要尽快下载文件。

编辑

我基本上每隔几秒或几分钟就会在存储桶中获取 png 图像。每次上传新图像时,我都想在已经运行的实例上下载它。我会做一些人工智能处理。由于图像会不断进入存储桶,因此我想不断将其下载到 ec2 中并尽快处理。 到目前为止,这是我在 Lambda 函数中的代码。

import boto3
import json


def lambda_handler(event, context):
    """Read file from s3 on trigger."""
    #print(event)
    s3 = boto3.client("s3")
    client = boto3.client("ec2")
    ssm = boto3.client("ssm")
    instanceid = "******"

    if event:
        file_obj = event["Records"][0]
        #print(file_obj)
        bucketname = str(file_obj["s3"]["bucket"]["name"])
        print(bucketname)
        filename = str(file_obj["s3"]["object"]["key"])
        print(filename)
        
        response = ssm.send_command(
            InstanceIds=[instanceid],
            DocumentName="AWS-RunShellScript",
            Parameters=
                "commands": [f"aws s3 cp filename ."]
            ,  # replace command_to_be_executed with command
        )
        
  
        # fetching command id for the output
        command_id = response["Command"]["CommandId"]

        time.sleep(3)

        # fetching command output
        output = ssm.get_command_invocation(CommandId=command_id, InstanceId=instanceid)
        print(output)
    return

但是我收到以下错误

Test Event Name
test

Response

  "errorMessage": "2021-12-01T14:11:30.781Z 88dbe51b-53d6-4c06-8c16-207698b3a936 Task timed out after 3.00 seconds"


Function Logs
START RequestId: 88dbe51b-53d6-4c06-8c16-207698b3a936 Version: $LATEST
END RequestId: 88dbe51b-53d6-4c06-8c16-207698b3a936
REPORT RequestId: 88dbe51b-53d6-4c06-8c16-207698b3a936  Duration: 3003.58 ms    Billed Duration: 3000 ms    Memory Size: 128 MB Max Memory Used: 87 MB  Init Duration: 314.81 ms    
2021-12-01T14:11:30.781Z 88dbe51b-53d6-4c06-8c16-207698b3a936 Task timed out after 3.00 seconds

Request ID
88dbe51b-53d6-4c06-8c16-207698b3a936

当我删除所有与 ssm 相关的行时,它工作正常。是权限问题还是代码有问题?

EDIT2

我的代码正在运行,但在我的 ec2 实例中没有看到任何输出或更改。我应该在主目录中看到一个空文本文件,但我什么也没看到 代码

import boto3
import json
import time

import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    """Read file from s3 on trigger."""
    #print(event)
    s3 = boto3.client("s3")
    client = boto3.client("ec2")
    ssm = boto3.client("ssm")
    instanceid = "******"
    print("HI")
    if event:
        file_obj = event["Records"][0]
        #print(file_obj)
        bucketname = str(file_obj["s3"]["bucket"]["name"])
        print(bucketname)
        filename = str(file_obj["s3"]["object"]["key"])
        print(filename)
        print("sending")
        try:
            response = ssm.send_command(
                InstanceIds=[instanceid],
                DocumentName="AWS-RunShellScript",
                Parameters=
                    "commands": ["touch hi.txt"]
                ,  # replace command_to_be_executed with command
            )
            # fetching command id for the output
            command_id = response["Command"]["CommandId"]

            time.sleep(3)

            # fetching command output
            output = ssm.get_command_invocation(CommandId=command_id, InstanceId=instanceid)
            print(output)
            
        except Exception as e:
            logger.error(e)
            raise e
        
        
  

【问题讨论】:

你能告诉我们更多关于为什么你想每次都下载它吗?文件会以某种方式在实例上处理吗?如果是这样,您的应用程序如何知道新文件已经到达——它是在检查文件,还是需要以某种方式“触发”?您会保留实例上的所有历史文件(这意味着您可以使用aws s3 sync),还是会在处理后删除文件?请编辑您的问题以提供这些额外的详细信息,以便我们提供适当的建议。 @JohnRotenstein 我现在已经添加了问题的详细信息 根据@theherk 的 cmets,您的方法似乎不正确。不要将其视为“将文件复制到 EC2”。相反,将其视为“每当新对象出现在 S3 中时,我都想使用 EC2 实例上的代码来处理它”。该代码既可以下载文件 也可以 处理图像。最好的架构方法是让 S3 向 Amazon SQS 队列发送消息,然后您在 EC2 实例上的程序将获取消息、下载文件、处理文件,然后从 SQS 队列中删除消息。这是可扩展的,因为 SQS 将充当待完成工作的缓冲区。 【参考方案1】:

我不知道为什么这里有 Lambda 的推荐。您需要的很简单:S3 object created event notification -> SQS 和您的 EC2 实例上的一些工作正在监视 long polling queue。

这是一个这样的 python 脚本的例子。您需要理清对象键在事件中的编码方式,但它会在那里。我还没有测试过,但应该很接近。

import boto3


def main() -> None:
    s3 = boto3.client("s3")
    sqs = boto3.client("sqs")
    while True:
        res = sqs.receive_message(
            QueueUrl="yourQueue",
            WaitTimeSeconds=20,
        )
        for msg in res.get("Messages", []):
            s3.download_file("yourBucket", msg["key"], "local/file/path")


if __name__ == "__main__":
    main()

【讨论】:

我已经添加了到目前为止我尝试过的方法。请查看并告诉我哪里出错了 你错了,我相信,因为涉及到一个 lambda。您不需要它或涉及 ssm。只需将您的存储桶设置为将对象创建的事件发布到具有上述长轮询的 SQS 队列即可。然后在您的 ec2 实例上运行从该队列接收的脚本。该消息将具有新的 s3 密钥。它可以只是一个简单的 python 程序,在while true 循环中从队列中读取。【参考方案2】:

您可以使用S3 Event Notifications,它会对进入s3 存储桶的新文件作出反应。 s3 事件支持的目的地是SNSSQSAWS lambda

您可以直接使用 lambda 作为目的地,如 @Marcin 所述

您可以使用 SQS has queue with a lambda behind 从队列中拉取。它允许你有一些能力,如死信队列。然后,您可以使用不同的方法从队列中提取消息:

AWS CLI AWS 开发工具包

您可以在 SNS 后面使用不同的东西(您可以在一行中拥有许多这样的目的地,这象征着 扇出模式

用于管理文件的 SQS 队列 要通知的电子邮件 一个 lambda 函数 ...

您可以在这篇文章中找到更多解释:https://aws.plainenglish.io/system-design-s3-events-to-lambda-vs-s3-events-to-sqs-sns-to-lambda-2d41477d1cc9

【讨论】:

【参考方案3】:

有几种方法。一种是设置s3 notifications to invoke a lambda function。然后 lambda 函数将使用 SSM Run Command 在您的实例上执行 AWS CLI S3 命令以从 S3 下载文件。

【讨论】:

当我运行测试代码时,我在 lambda 函数中出现以下错误 ``` "errorMessage": "2021-12-01T14:01:21.513Z c028410e-3a98-4041-b46d- b4756d3a04d4 任务在 3.00 秒后超时" ``` 我已经添加了我尝试过的输出和代码。我也用过你的方法。

以上是关于如何每次将新上传的文件从 s3 下载到 ec2的主要内容,如果未能解决你的问题,请参考以下文章

在关闭之前将 docker 日志从 ec2 实例上传到 S3

将文件上传到 S3 时如何传递 ACL 属性以进行颤振放大?

无法从弹性 beanstalk/ec2 服务器上传媒体文件

将文件从S3下载到EC2时出错

如何使用 lambda 函数从 AWS s3 获取文本文件的内容?

从 url 下载文件并将其上传到 AWS S3 而不保存 - node.js