使用 boto3 lib 和 AWS Lambda 从 S3 存储桶中的压缩文件中获取数据流
Posted
技术标签:
【中文标题】使用 boto3 lib 和 AWS Lambda 从 S3 存储桶中的压缩文件中获取数据流【英文标题】:Getting a data stream from a zipped file sitting in a S3 bucket using boto3 lib and AWS Lambda 【发布时间】:2018-02-13 23:08:13 【问题描述】:我正在尝试为我的 chron 作业创建一个无服务器处理器,在这项作业中,我在我的 S3 存储桶中从我的一个客户那里收到了一个压缩文件,文件大小约为 50MB
,但一旦你解压缩它,它就会变成1.5GB
的大小,并且 AWS Lambda 上的可用空间有一个硬限制,即 500MB
由于我无法从 S3 存储桶下载此文件并将其解压缩到我的 Lambda 上,我成功地解压缩了我的文件并流式传输在 unix 脚本中使用 funzip
从 S3 逐行显示内容。
for x in $files ; do echo -n "$x: " ; timeout 5 aws s3 cp $monkeydir/$x - | funzip
我的桶名:MonkeyBusiness
密钥:/Daily/Business/Banana/current-date
对象:banana.zip
但是现在由于我尝试使用 boto3 实现相同的输出,我如何将压缩内容流式传输到 sys i/o 并解压缩流将内容保存在单独的文件中,每个文件除以 10000 行并将分块文件上传回S3。 需要指导,因为我对 AWS 和 boto3 还很陌生。
如果您需要有关该工作的更多详细信息,请告诉我。
下面给出的建议解决方案在这里不适用,因为 zlib 文档明确指出,该 lib 与 gzip 文件格式兼容,而我的问题是 zip 文件格式。
import zlib
def stream_gzip_decompress(stream):
dec = zlib.decompressobj(32 + zlib.MAX_WBITS) # offset 32 to skip the header
for chunk in stream:
rv = dec.decompress(chunk)
if rv:
yield rv
【问题讨论】:
要逐块解压和流式传输,请查找 io.cStringIO 或 ByteIO 以及 zlib 模块。 @mootmoot 如何从 S3 流式传输它让我心痛。 试试这个并用 s3.put_object() 替换文件写入,说它支持流***.com/questions/27035296/… @mootmoot 不适用于此处,我使用 Lambda 从 S3 流式传输和解压缩压缩文件,建议的示例是在 ec2 上压缩文件,我也需要逐行数据,而不是逐行块大小,导致块可能会断线:( 我认为你弄错了,阻止读取数据并不会打破界限,它只是将数据作为流字节进行处理。而且块解压比较麻烦 【参考方案1】:所以我使用 BytesIO 将压缩文件读入缓冲区对象,然后使用 zipfile 将解压后的流作为未压缩数据打开,并且能够逐行获取数据。
import io
import zipfile
import boto3
import sys
s3 = boto3.resource('s3', 'us-east-1')
def stream_zip_file():
count = 0
obj = s3.Object(
bucket_name='MonkeyBusiness',
key='/Daily/Business/Banana/current-date/banana.zip'
)
buffer = io.BytesIO(obj.get()["Body"].read())
print (buffer)
z = zipfile.ZipFile(buffer)
foo2 = z.open(z.infolist()[0])
print(sys.getsizeof(foo2))
line_counter = 0
for _ in foo2:
line_counter += 1
print (line_counter)
z.close()
if __name__ == '__main__':
stream_zip_file()
【讨论】:
对于 gzip 文件,这里是类似的方法 gist.github.com/veselosky/9427faa38cee75cd8e27 您在哪里阅读基于文本的数据,例如 CSV? zip中哪里有很多文件?如果是这样,您是如何处理它们的?【参考方案2】:这不是确切的答案。不过你可以试试这个。
首先,请修改answer that mentioned about gzip file with limited memory,这种方法允许一个块一个块地流式传输文件。而 boto3 S3 put_object() 和 upload_fileobj 似乎允许流式传输。
您需要将上面提到的代码与以下解压缩混合和调整。
stream = cStringIO.StringIO()
stream.write(s3_data)
stream.seek(0)
blocksize = 1 << 16 #64kb
with gzip.GzipFile(fileobj=stream) as decompressor:
while True:
boto3.client.upload_fileobj(decompressor.read(blocksize), "bucket", "key")
我不能保证上面的代码能正常工作,它只是给你解压缩文件并分块重新上传的想法。您甚至可能需要将解压缩数据通过管道传输到 ByteIo 并通过管道传输到 upload_fileobj。有很多测试。
如果您不需要尽快解压缩文件,我的建议是使用 lambda 将文件放入 SQS 队列。当有“足够”的文件时,触发一个 SPOT 实例(这将非常便宜),它将读取队列并处理文件。
【讨论】:
比起将分块文件上传回 S3,我更担心在不让 lambda 内存不足的情况下解压缩流 如果将块大小设置在内存规格以下应该会很好 @Shek 运气好,这部分需要熟悉python流io的高手以上是关于使用 boto3 lib 和 AWS Lambda 从 S3 存储桶中的压缩文件中获取数据流的主要内容,如果未能解决你的问题,请参考以下文章
在单个AWS Lambda中使用两个python函数boto3
无法在本地和 lambda 上找到 boto3.client 的凭据