使用 boto3 lib 和 AWS Lambda 从 S3 存储桶中的压缩文件中获取数据流

Posted

技术标签:

【中文标题】使用 boto3 lib 和 AWS Lambda 从 S3 存储桶中的压缩文件中获取数据流【英文标题】:Getting a data stream from a zipped file sitting in a S3 bucket using boto3 lib and AWS Lambda 【发布时间】:2018-02-13 23:08:13 【问题描述】:

我正在尝试为我的 chron 作业创建一个无服务器处理器,在这项作业中,我在我的 S3 存储桶中从我的一个客户那里收到了一个压缩文件,文件大小约为 50MB,但一旦你解压缩它,它就会变成1.5GB 的大小,并且 AWS Lambda 上的可用空间有一个硬限制,即 500MB 由于我无法从 S3 存储桶下载此文件并将其解压缩到我的 Lambda 上,我成功地解压缩了我的文件并流式传输在 unix 脚本中使用 funzip 从 S3 逐行显示内容。

for x in $files ; do echo -n "$x: " ; timeout 5 aws s3 cp $monkeydir/$x - | funzip

我的桶名:MonkeyBusiness 密钥:/Daily/Business/Banana/current-date 对象:banana.zip

但是现在由于我尝试使用 boto3 实现相同的输出,我如何将压缩内容流式传输到 sys i/o 并解压缩流将内容保存在单独的文件中,每个文件除以 10000 行并将分块文件上传回S3。 需要指导,因为我对 AWS 和 boto3 还很陌生。

如果您需要有关该工作的更多详细信息,请告诉我。

下面给出的建议解决方案在这里不适用,因为 zlib 文档明确指出,该 lib 与 gzip 文件格式兼容,而我的问题是 zip 文件格式。

import zlib

def stream_gzip_decompress(stream):
    dec = zlib.decompressobj(32 + zlib.MAX_WBITS)  # offset 32 to skip the header
    for chunk in stream:
        rv = dec.decompress(chunk)
        if rv:
            yield rv 

【问题讨论】:

要逐块解压和流式传输,请查找 io.cStringIO 或 ByteIO 以及 zlib 模块。 @mootmoot 如何从 S3 流式传输它让我心痛。 试试这个并用 s3.put_object() 替换文件写入,说它支持流***.com/questions/27035296/… @mootmoot 不适用于此处,我使用 Lambda 从 S3 流式传输和解压缩压缩文件,建议的示例是在 ec2 上压缩文件,我也需要逐行数据,而不是逐行块大小,导致块可能会断线:( 我认为你弄错了,阻止读取数据并不会打破界限,它只是将数据作为流字节进行处理。而且块解压比较麻烦 【参考方案1】:

所以我使用 BytesIO 将压缩文件读入缓冲区对象,然后使用 zipfile 将解压后的流作为未压缩数据打开,并且能够逐行获取数据。

import io
import zipfile
import boto3
import sys

s3 = boto3.resource('s3', 'us-east-1')


def stream_zip_file():
    count = 0
    obj = s3.Object(
        bucket_name='MonkeyBusiness',
        key='/Daily/Business/Banana/current-date/banana.zip'
    )
    buffer = io.BytesIO(obj.get()["Body"].read())
    print (buffer)
    z = zipfile.ZipFile(buffer)
    foo2 = z.open(z.infolist()[0])
    print(sys.getsizeof(foo2))
    line_counter = 0
    for _ in foo2:
        line_counter += 1
    print (line_counter)
    z.close()


if __name__ == '__main__':
    stream_zip_file()

【讨论】:

对于 gzip 文件,这里是类似的方法 gist.github.com/veselosky/9427faa38cee75cd8e27 您在哪里阅读基于文本的数据,例如 CSV? zip中哪里有很多文件?如果是这样,您是如何处理它们的?【参考方案2】:

这不是确切的答案。不过你可以试试这个。

首先,请修改answer that mentioned about gzip file with limited memory,这种方法允许一个块一个块地流式传输文件。而 boto3 S3 put_object() 和 upload_fileobj 似乎允许流式传输。

您需要将上面提到的代码与以下解压缩混合和调整。

stream = cStringIO.StringIO()
stream.write(s3_data)
stream.seek(0)
blocksize = 1 << 16  #64kb
with gzip.GzipFile(fileobj=stream) as decompressor:
    while True:
        boto3.client.upload_fileobj(decompressor.read(blocksize), "bucket", "key")

我不能保证上面的代码能正常工作,它只是给你解压缩文件并分块重新上传的想法。您甚至可能需要将解压缩数据通过管道传输到 ByteIo 并通过管道传输到 upload_fileobj。有很多测试。

如果您不需要尽快解压缩文件,我的建议是使用 lambda 将文件放入 SQS 队列。当有“足够”的文件时,触发一个 SPOT 实例(这将非常便宜),它将读取队列并处理文件。

【讨论】:

比起将分块文件上传回 S3,我更担心在不让 lambda 内存不足的情况下解压缩流 如果将块大小设置在内存规格以下应该会很好 @Shek 运气好,这部分需要熟悉python流io的高手

以上是关于使用 boto3 lib 和 AWS Lambda 从 S3 存储桶中的压缩文件中获取数据流的主要内容,如果未能解决你的问题,请参考以下文章

在单个AWS Lambda中使用两个python函数boto3

无法在本地和 lambda 上找到 boto3.client 的凭据

使用boto3下载Lambda函数的部署包

AWS ECS Docker 容器 Boto3 IAM 权限

使用AWS lambda函数调用lex chat bot

AWS 成本浏览器 boto3