使用boto从S3逐行读取文件?

Posted

技术标签:

【中文标题】使用boto从S3逐行读取文件?【英文标题】:Read a file line by line from S3 using boto? 【发布时间】:2015-04-21 12:16:35 【问题描述】:

我在 S3 中有一个 csv 文件,我正在尝试读取标题行以获取大小(这些文件是由我们的用户创建的,因此它们几乎可以是任何大小)。有没有办法使用 boto 做到这一点?我想也许我可以使用 python BufferedReader,但我不知道如何从 S3 键打开流。任何建议都会很棒。谢谢!

【问题讨论】:

key.size 不适合你吗?返回密钥大小(以字节为单位)。如果你想要标题,你也可以只流式传输第一个块,如下所示:***.com/a/7625197/786559。 【参考方案1】:

这是一个实际逐行流式传输数据的解决方案:

from io import TextIOWrapper
from gzip import GzipFile
...

# get StreamingBody from botocore.response
response = s3.get_object(Bucket=bucket, Key=key)
# if gzipped
gzipped = GzipFile(None, 'rb', fileobj=response['Body'])
data = TextIOWrapper(gzipped)

for line in data:
    # process line

【讨论】:

gzip 要求不在最初的问题中,但这正是我的用例所需要的。谢谢! 这看起来比使用外部包更简洁。遗憾的是 1)您的答案没有被检查为实际答案,并且 2)smart_open 解决方案得到了如此多的提升。 如果我们不使用GzipFile来解压它是行不通的,因为StreamingBody没有readable需要TextIOWrapper的属性 这应该是公认的答案... smart_open 似乎要慢得多:github.com/RaRe-Technologies/smart_open/issues/457 在我自己的测试文件中,使用此答案中描述的方法比使用 smart_open 快约 5 倍。这是一个超酷的库,但似乎不符合这个特定用例的标准。【参考方案2】:

您可能会发现https://pypi.python.org/pypi/smart_open 对您的任务很有用。

来自文档:

for line in smart_open.smart_open('s3://mybucket/mykey.txt'):
    print line

【讨论】:

这容易多了!并且默认支持 gzipped 文件! 感谢分享! 费用是多少?您是否最终下载整个文件只是为了阅读几行? 使用外部库通常是不好的做法 - 您希望拥有最少的依赖项,以便轻松升级和前进。 @AmundeepSingh 是的,您只需执行 s3.get_object(),然后将 response["Body"] 传递给 io.TextIOWrapper,然后逐行读取。内存将根据设置的缓冲区保持不变。这是我如何读取压缩的 gz 文件并逐行读取它们gist.github.com/gudata/da5d0553a309836d998a56c73c60575c 这是几行代码,仅使用 python 和 boto 中的内容【参考方案3】:

我知道这是一个非常古老的问题。

但现在,我们可以使用s3_conn.get_object(Bucket=bucket, Key=key)['Body'].iter_lines()

【讨论】:

是的,如果今天提出这个问题,那么 .iter_chunks() 就是答案 +1 iter_lines() 方法是 botocore.response 包的一部分:botocore.amazonaws.com/v1/documentation/api/latest/reference/…【参考方案4】:

stdlib 中的codecs module 提供了一种将字节流编码为文本流的简单方法,并提供了一个生成器来逐行检索此文本。它可以毫不费力地与 S3 一起使用:

import codecs

import boto3


s3 = boto3.resource("s3")
s3_object = s3.Object('my-bucket', 'a/b/c.txt')
line_stream = codecs.getreader("utf-8")

for line in line_stream(s3_object.get()['Body']):
    print(line)

【讨论】:

【参考方案5】:

看来 boto 有一个 read() 函数可以做到这一点。下面是一些适合我的代码:

>>> import boto
>>> from boto.s3.key import Key
>>> conn = boto.connect_s3('ap-southeast-2')
>>> bucket = conn.get_bucket('bucket-name')
>>> k = Key(bucket)
>>> k.key = 'filename.txt'
>>> k.open()
>>> k.read(10)
'This text '

read(n) 的调用从对象返回接下来的 n 个字节。

当然,这不会自动返回“标题行”,但您可以使用足够大的数字调用它以至少返回标题行。

【讨论】:

谢谢,约翰。如果我找不到流式传输文件的方法,这将是我的后备解决方案。我只是猜测标题的最大大小,然后从那里开始。 @John Rotenstein - 你不需要在 read() 之后关闭文件吗?【参考方案6】:

使用 boto3,您可以访问原始流并逐行读取。 请注意,由于某种原因,原始流是私有财产

s3 = boto3.resource('s3', aws_access_key_id='xxx', aws_secret_access_key='xxx')
obj = s3.Object('bucket name', 'file key')

obj.get()['Body']._raw_stream.readline() # line 1
obj.get()['Body']._raw_stream.readline() # line 2
obj.get()['Body']._raw_stream.readline() # line 3...

【讨论】:

正如_raw_stream 以下划线开头的属性所暗示的,这不是您应该访问流内容的方式。可以通过调用obj.get()["body"].read() 完整读取它们或使用obj.get()["body"].iter_lines() 作为生成器迭代 对于某些 python API,这是唯一可行的方法(例如,pickle.load 期望在其参数上同时找到 .read().readline() @Alex 是 obj.get()["Body"].read() 请注意正文中的大写 B【参考方案7】:

使用boto3:

s3 = boto3.resource('s3')
obj = s3.Object(BUCKET, key)
for line in obj.get()['Body']._raw_stream:
    # do something with line

【讨论】:

【参考方案8】:

如果您想读取具有特定存储桶前缀(即在“子文件夹”中)的多个文件(逐行),您可以这样做:

s3 = boto3.resource('s3', aws_access_key_id='<key_id>', aws_secret_access_key='<access_key>')

    bucket = s3.Bucket('<bucket_name>')
    for obj in bucket.objects.filter(Prefix='<your prefix>'):
        for line in obj.get()['Body'].read().splitlines():
            print(line.decode('utf-8'))

这里的行是字节,所以我正在解码它们;但是如果它们已经是一个字符串,你可以跳过它。

【讨论】:

【参考方案9】:

读取文件最动态且成本最低的方法是读取每个字节,直到找到所需的行数。

line_count = 0
line_data_bytes = b''

while line_count < 2 :

    incoming = correlate_file_obj['Body'].read(1)
    if incoming == b'\n':
        line_count = line_count + 1

    line_data_bytes = line_data_bytes + incoming

logger.debug("read bytes:")
logger.debug(line_data_bytes)

line_data = line_data_bytes.split(b'\n')

如果标题大小可以更改,您无需猜测标题大小,您最终不会下载整个文件,并且您不需要 3rd 方工具。当然,您需要确保文件中的行分隔符正确,并且您正在读取正确的字节数以找到它。

【讨论】:

【参考方案10】:

扩展 kooshywoosh 的回答:无法直接在纯二进制文件中的 StreamingBody 上使用 TextIOWrapper(这非常有用),因为您会收到以下错误:

"builtins.AttributeError: 'StreamingBody' object has no attribute 'readable'"

但是,您可以使用 botocore 的 github 页面上this 长期存在的问题中提到的以下 hack,并围绕 StreamingBody 定义一个非常简单的包装类:

from io import RawIOBase
...

class StreamingBodyIO(RawIOBase):
"""Wrap a boto StreamingBody in the IOBase API."""
def __init__(self, body):
    self.body = body

def readable(self):
    return True

def read(self, n=-1):
    n = None if n < 0 else n
    return self.body.read(n)

然后,您可以简单地使用以下代码:

from io import TextIOWrapper
...

# get StreamingBody from botocore.response
response = s3.get_object(Bucket=bucket, Key=key)
data = TextIOWrapper(StreamingBodyIO(response))
for line in data:
    # process line

【讨论】:

另外,请记住“从 io 导入 RawIOBase”

以上是关于使用boto从S3逐行读取文件?的主要内容,如果未能解决你的问题,请参考以下文章

从 S3 存储桶中读取大量 CSV 文件

如何使用 pyarrow 从 S3 读取镶木地板文件列表作为熊猫数据框?

Pyspark 从 S3 存储桶的子目录中读取所有 JSON 文件

在python中使用s3 select解析多个镶木地板文件?

Boto3 从 S3 存储桶下载所有文件

C++中怎么逐行读取数据