您可以使用流而不是本地文件上传到 S3 吗?

Posted

技术标签:

【中文标题】您可以使用流而不是本地文件上传到 S3 吗?【英文标题】:Can you upload to S3 using a stream rather than a local file? 【发布时间】:2015-09-10 22:41:09 【问题描述】:

我需要创建一个 CSV 并将其上传到 S3 存储桶。由于我是动态创建文件的,所以如果我可以在创建文件时将其直接写入 S3 存储桶,而不是在本地写入整个文件,然后在最后上传文件,那会更好。

有没有办法做到这一点?我的项目是用 Python 编写的,而且我对这门语言还很陌生。到目前为止,这是我尝试过的:

import csv
import csv
import io
import boto
from boto.s3.key import Key


conn = boto.connect_s3()
bucket = conn.get_bucket('dev-vs')
k = Key(bucket)
k.key = 'foo/foobar'

fieldnames = ['first_name', 'last_name']
writer = csv.DictWriter(io.StringIO(), fieldnames=fieldnames)
k.set_contents_from_stream(writer.writeheader())

我收到了这个错误:BotoClientError: s3 does not support chunked transfer

更新:我找到了一种直接写入 S3 的方法,但我找不到清除缓冲区而不实际删除我已经写过的行的方法。所以,例如:

conn = boto.connect_s3()
bucket = conn.get_bucket('dev-vs')
k = Key(bucket)
k.key = 'foo/foobar'

testDict = [
    "fieldA": "8",
    "fieldB": None,
    "fieldC": "888888888888",
    
    "fieldA": "9",
    "fieldB": None,
    "fieldC": "99999999999"]

f = io.StringIO()
fieldnames = ['fieldA', 'fieldB', 'fieldC']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
k.set_contents_from_string(f.getvalue())

for row in testDict:
    writer.writerow(row)
    k.set_contents_from_string(f.getvalue())

f.close()

向文件写入 3 行,但是我无法释放内存来写入大文件。如果我添加:

f.seek(0)
f.truncate(0)

到循环,然后只写入文件的最后一行。有什么方法可以释放资源而不删除文件中的行?

【问题讨论】:

即使您可以按照自己的意愿写入 S3,由于一致性挑战,我不推荐它。为什么你认为不写本地会更好?如果出现异常或问题,您是否需要部分 S3 对象?我想不会。 我希望直接写来提高效率。本质上,如果我在本地编写文件并上传它,我将添加上传作为附加步骤,并清理本地文件。我不介意有一个不完整的文件 - 如果我也在本地编写它,我可能会有一个不完整的文件。系统将是幂等的,要么删除一个处于错误状态的文件,要么继续它。 【参考方案1】:

我确实找到了我的问题的解决方案,我会在这里发布以防其他人感兴趣。我决定将其作为分段上传的一部分。您无法流式传输到 S3。还有一个包可以将您的流文件更改为我使用的分段上传:Smart Open。

import smart_open
import io
import csv

testDict = [
    "fieldA": "8",
    "fieldB": None,
    "fieldC": "888888888888",
    
    "fieldA": "9",
    "fieldB": None,
    "fieldC": "99999999999"]

fieldnames = ['fieldA', 'fieldB', 'fieldC']
f = io.StringIO()
with smart_open.smart_open('s3://dev-test/bar/foo.csv', 'wb') as fout:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    fout.write(f.getvalue())

    for row in testDict:
        f.seek(0)
        f.truncate(0)
        writer.writerow(row)
        fout.write(f.getvalue())

f.close()

【讨论】:

对于 Python 2,请务必使用 StringIO.StringIO() 而不是 io.StringIO(),否则会收到编码错误 @inquiring minds,这是一个很好的答案。我的用例几乎和你的一样,只是不同的是而不是 csv,我想生成一个 XML。因为我喜欢使用像 Mako/genshi 这样的模板选项来生成 xml,你能建议我一个如何处理它的方法吗? (生成和写入同时进行,而不是先本地写入)【参考方案2】:

当文件内容作为 Django 请求中的 InMemoryUploadedFile 对象通过时,我们试图将文件内容上传到 s3。我们最终做了以下事情,因为我们不想在本地保存文件。希望对您有所帮助:

@action(detail=False, methods=['post'])
def upload_document(self, request):
     document = request.data.get('image').file
     s3.upload_fileobj(document, BUCKET_NAME, 
                                 DESIRED_NAME_OF_FILE_IN_S3, 
                                 ExtraArgs="ServerSideEncryption": "aws:kms")

【讨论】:

虽然这种方法有效,但它并不意味着流式传输 - 因为 InMemoryUploadedFile 将整个文件保存在 RAM 中。内存中文件的大小相对较小 - 它们不是即时生成的。【参考方案3】:

根据docs这是可能的

s3.Object('mybucket', 'hello.txt').put(Body=open('/tmp/hello.txt', 'rb'))

所以我们可以正常使用StringIO

更新:来自@inquiring minds 的smart_open lib 答案是更好的解决方案

【讨论】:

我不明白如何使用它。 /tmp/hello.txt 不是我们要避免的本地文件吗? @EthanP StringIO — 以文件形式读取和写入字符串。使用StringIO 对象而不是文件 不,根据this ticket,不支持。在 S3 中使用流的想法是避免在需要上传几千兆字节的大文件时使用静态文件。我也在尝试解决这个问题 - 我需要从 mongodb 读取大量数据并放入 S3,我不想使用文件。 @baldr 嗯。这个技巧过去对我有用。顺便说一句,在您的消息中提到的票证中,我看到了另一个 useful 方法。不幸的是,我现在不与亚马逊合作,也无法对其进行测试 我试图挖掘boto 来源,我发现它需要为每个发送的文件计算 MD5 校验和。这意味着流至少应该是“可搜索的”。当我从 mongodb 读取数据时,我有不可搜索的流,我无法轻松地倒回数据流。这里推荐的smart_open 允许使用流,但它只使用内部缓冲区,然后也使用boto 的“分段上传”。从技术上讲,可以使用类似文件的流,但要准备好它可能需要大量内存。流的思想 - 是使用低内存来上传(可能)无穷无尽的数据流。【参考方案4】:

这是一个使用boto3的完整示例

import boto3
import io

session = boto3.Session(
    aws_access_key_id="...",
    aws_secret_access_key="..."
)

s3 = session.resource("s3")

buff = io.BytesIO()

buff.write("test1\n".encode())
buff.write("test2\n".encode())

s3.Object(bucket, keypath).put(Body=buff.getvalue())

【讨论】:

【参考方案5】:

GitHub smart_open 问题 (#82) 中提到了一个有趣的代码解决方案,我一直想尝试。在这里复制粘贴以供后代使用...看起来需要boto3

csv_data = io.BytesIO()
writer = csv.writer(csv_data)
writer.writerows(my_data)

gz_stream = io.BytesIO()
with gzip.GzipFile(fileobj=gz_stream, mode="w") as gz:
    gz.write(csv_data.getvalue())
gz_stream.seek(0)

s3 = boto3.client('s3')
s3.upload_fileobj(gz_stream, bucket_name, key)

这个特定示例是流式传输到压缩的 S3 密钥/文件,但它似乎是通用方法——使用 boto3 S3 客户端的 upload_fileobj() 方法与目标流而不是文件结合——应该可以工作.

【讨论】:

你能解释一下这里的 my_data 是什么吗?是列表还是字典?? 根据这个 *** 的答案,writer.writerows() 采用一个可迭代的可迭代对象——列表列表、数组数组等——作为输入:***.com/a/33092057/165494【参考方案6】:

有一个得到很好支持的库可以做到这一点:

pip install s3fs

s3fs 用起来真的很简单:

import s3fs

s3fs.S3FileSystem(anon=False)

with s3.open('mybucket/new-file', 'wb') as f:
    f.write(2*2**20 * b'a')
    f.write(2*2**20 * b'a')

顺便说一句,boto3 中还内置了一些东西(由 AWS API 支持),称为 MultiPartUpload。

这不是作为 python 流的因素,这对某些人来说可能是一个优势。相反,您可以开始上传并一次发送一个部分。

【讨论】:

【参考方案7】:

要将字符串写入 S3 对象,请使用:

s3.Object('my_bucket', 'my_file.txt').put('Hello there')

所以将流转换为字符串,就可以了。

【讨论】:

这仅适用于对象的大小对于内存来说不太大的情况。

以上是关于您可以使用流而不是本地文件上传到 S3 吗?的主要内容,如果未能解决你的问题,请参考以下文章

将文件上传并压缩到s3

如何从 GAE 上传文件到 S3(一个恐怖故事)

github上传了配置信息

使用 go 将流文件上传到 AWS S3

如何禁用分段上传,以便同步匹配 s3cmd 中本地和远程文件的 md5sum

golang aws-sdk-go 之 s3 服务