如何使用 boto 将文件从 Amazon S3 流式传输到 Rackspace Cloudfiles?

Posted

技术标签:

【中文标题】如何使用 boto 将文件从 Amazon S3 流式传输到 Rackspace Cloudfiles?【英文标题】:How can I use boto to stream a file out of Amazon S3 to Rackspace Cloudfiles? 【发布时间】:2011-11-29 07:23:18 【问题描述】:

我正在将文件从 S3 复制到 Cloudfiles,并且我想避免将文件写入磁盘。 Python-Cloudfiles 库有一个 object.stream() 调用,看起来是我需要的,但我在 boto 中找不到等效调用。我希望我能够做类似的事情:

shutil.copyfileobj(s3Object.stream(),rsObject.stream())

boto(或者我想任何其他 s3 库)可以做到这一点吗?

【问题讨论】:

smart_open Python 库可以做到这一点(用于阅读和写作)。 【参考方案1】:

此线程中的其他答案与 boto 有关,但 S3.Object 在 boto3 中不再可迭代。因此,以下内容不起作用,它会产生 TypeError: 's3.Object' object is not iterable 错误消息:

s3 = boto3.session.Session(profile_name=my_profile).resource('s3')
s3_obj = s3.Object(bucket_name=my_bucket, key=my_key)

with io.FileIO('sample.txt', 'w') as file:
    for i in s3_obj:
        file.write(i)

在 boto3 中,对象的内容可在 S3.Object.get()['Body'] 获得,这是自版本 1.9.68 以来的可迭代但以前不是。因此,以下内容适用于最新版本的 boto3,但不适用于早期版本:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    for i in body:
        file.write(i)

因此,对于较旧的 bo​​to3 版本,另一种选择是使用 read 方法,但这会将整个 S3 对象加载到内存中,这在处理大文件时并不总是可能的:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    for i in body.read():
        file.write(i)

但是read 方法允许传入amt 参数,指定我们要从底层流中读取的字节数。可以重复调用此方法,直到读取整个流:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    while file.write(body.read(amt=512)):
        pass

深入botocore.response.StreamingBody 代码一发现底层流也可用,因此我们可以如下迭代:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    for b in body._raw_stream:
        file.write(b)

在谷歌搜索时,我还看到了一些可以使用的链接,但我没有尝试过:

WrappedStreamingBody Another related thread An issue in boto3 github to request StreamingBody is a proper stream - 已关闭!!!

【讨论】:

非常有用的答案。谢谢@smallo。感谢您公开了我认为大多数人都在寻找的私有 __raw_stream。 如果我绕过这个身体StreamingBody,这是否意味着HTTP连接没有终止?还是流媒体体被缓冲了? 不确定在编写此答案时是否可用,但botocore.response.StreamingBody 现在为此目的公开了iter_chunksiter_lines【参考方案2】:

boto 中的 Key 对象,代表 S3 中的对象,可以像迭代器一样使用,因此您应该能够执行以下操作:

>>> import boto
>>> c = boto.connect_s3()
>>> bucket = c.lookup('garnaat_pub')
>>> key = bucket.lookup('Scan1.jpg')
>>> for bytes in key:
...   write bytes to output stream

或者,就像您的示例一样,您可以这样做:

>>> shutil.copyfileobj(key, rsObject.stream())

【讨论】:

S3.Object 不再可迭代。 S3.object 仍然是可迭代的,但是使用 S3object['body'].iter_lines() 就像这样【参考方案3】:

我认为至少有一些看到这个问题的人会像我一样,并且会想要一种方法来逐行(或逐行逗号,或任何其他分隔符)从 boto 流式传输文件。这是一个简单的方法:

def getS3ResultsAsIterator(self, aws_access_info, key, prefix):        
    s3_conn = S3Connection(**aws_access)
    bucket_obj = s3_conn.get_bucket(key)
    # go through the list of files in the key
    for f in bucket_obj.list(prefix=prefix):
        unfinished_line = ''
        for byte in f:
            byte = unfinished_line + byte
            #split on whatever, or use a regex with re.split()
            lines = byte.split('\n')
            unfinished_line = lines.pop()
            for line in lines:
                yield line

@garnaat 上面的回答仍然很棒而且 100% 正确。希望我的仍然可以帮助某人。

【讨论】:

拆分其他两种类型的行结尾为:lines = re.split(r'[\n\r]+', byte) - 有助于从 Excel 导出 CSV 文件 再注意:我必须在for byte in f:循环完成后添加yield unfinished_line,否则最后一行将无法处理 这不是 Boto3 API 的一部分有充分的理由吗?如果没有,是否应该提交一个拉取请求来解决这个问题?我会因为把它撞起来而超级失望! @Eli 是的,我会敲一个基于生成器的东西,它将通过给定的分隔符将流在线分块?超级渴望粉碎它! 让我们看看这个拉取请求是如何在 botocore 处理的:github.com/boto/botocore/pull/1034【参考方案4】:

Botocore 的 StreamingBody 有一个 iter_lines() 方法:

https://botocore.amazonaws.com/v1/documentation/api/latest/reference/response.html#botocore.response.StreamingBody.iter_lines

所以:

import boto3
s3r = boto3.resource('s3')
iterator = s3r.Object(bucket, key).get()['Body'].iter_lines()

for line in iterator:
    print(line)

【讨论】:

这不会继续流它只是得到一个块 @cosbor11 您可以根据需要指定块大小:.iter_lines(chunk_size=1024)【参考方案5】:

这是我包装流媒体体的解决方案:

import io
class S3ObjectInterator(io.RawIOBase):
    def __init__(self, bucket, key):
        """Initialize with S3 bucket and key names"""
        self.s3c = boto3.client('s3')
        self.obj_stream = self.s3c.get_object(Bucket=bucket, Key=key)['Body']

    def read(self, n=-1):
        """Read from the stream"""
        return self.obj_stream.read() if n == -1 else self.obj_stream.read(n)

示例用法:

obj_stream = S3ObjectInterator(bucket, key)
for line in obj_stream:
    print line

【讨论】:

以上是关于如何使用 boto 将文件从 Amazon S3 流式传输到 Rackspace Cloudfiles?的主要内容,如果未能解决你的问题,请参考以下文章

Amazon S3 boto:如何重命名存储桶中的文件?

如何将抓取的数据从 Scrapy 以 csv 或 json 格式上传到 Amazon S3?

使用 Amazon s3 boto 库,如何获取已保存密钥的 URL?

如何使用 boto3 将 S3 对象保存到文件中

如何使 Pyspark 脚本在 Amazon EMR 上运行以识别 boto3 模块?它说找不到模块

如何将文件上传到 S3 并使用 boto3 将其公开?