如何使用 pyarrow 从 S3 读取镶木地板文件列表作为熊猫数据框?

Posted

技术标签:

【中文标题】如何使用 pyarrow 从 S3 读取镶木地板文件列表作为熊猫数据框?【英文标题】:How to read a list of parquet files from S3 as a pandas dataframe using pyarrow? 【发布时间】:2017-12-16 01:08:53 【问题描述】:

我有一种使用 boto3 (1.4.4)、pyarrow (0.4.1) 和 pandas (0.20.3) 的巧妙方法。

首先,我可以像这样在本地读取单个 parquet 文件:

import pyarrow.parquet as pq

path = 'parquet/part-r-00000-1e638be4-e31f-498a-a359-47d017a0059c.gz.parquet'
table = pq.read_table(path)
df = table.to_pandas()

我也可以像这样在本地读取 parquet 文件的目录:

import pyarrow.parquet as pq

dataset = pq.ParquetDataset('parquet/')
table = dataset.read()
df = table.to_pandas()

两者都像魅力一样工作。现在我想用存储在 S3 存储桶中的文件远程实现相同的目标。我希望这样的事情会起作用:

dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket')

但它没有:

OSError: Passed non-file path: s3n://dsn/to/my/bucket

在彻底阅读pyarrow's documentation 之后,这似乎是不可能的at the moment。所以我想出了以下解决方案:

从 S3 读取单个文件并获取 pandas 数据框:

import io
import boto3
import pyarrow.parquet as pq

buffer = io.BytesIO()
s3 = boto3.resource('s3')
s3_object = s3.Object('bucket-name', 'key/to/parquet/file.gz.parquet')
s3_object.download_fileobj(buffer)
table = pq.read_table(buffer)
df = table.to_pandas()

这里是我从 S3 文件夹路径创建 pandas 数据框的 hacky、未优化的解决方案:

import io
import boto3
import pandas as pd
import pyarrow.parquet as pq

bucket_name = 'bucket-name'
def download_s3_parquet_file(s3, bucket, key):
    buffer = io.BytesIO()
    s3.Object(bucket, key).download_fileobj(buffer)
    return buffer

client = boto3.client('s3')
s3 = boto3.resource('s3')
objects_dict = client.list_objects_v2(Bucket=bucket_name, Prefix='my/folder/prefix')
s3_keys = [item['Key'] for item in objects_dict['Contents'] if item['Key'].endswith('.parquet')]
buffers = [download_s3_parquet_file(s3, bucket_name, key) for key in s3_keys]
dfs = [pq.read_table(buffer).to_pandas() for buffer in buffers]
df = pd.concat(dfs, ignore_index=True)

有没有更好的方法来实现这一点?也许某种使用pyarrow的熊猫连接器?我想避免使用pyspark,但如果没有其他解决方案,我会接受。

【问题讨论】:

你有没有考虑用 dask 阅读它们?我可以在两行中做同样的事情。 只有在过滤到大小 > 0 的文件后才能使您的 hacky 解决方案工作:dfs = [pq.read_table(buffer).to_pandas() for buffer in buffers if len(buffer.getvalue()) > 0]。谢谢!不幸的是,这里的答案并不能完全解决这个问题。 【参考方案1】:

您可以使用来自 dask 的 s3fs,它为 s3 实现了文件系统接口。然后你可以像这样使用 ParquetDataset 的文件系统参数:

import s3fs
s3 = s3fs.S3FileSystem()
dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket', filesystem=s3)

【讨论】:

【参考方案2】:

您应该使用yjk21 建议的s3fs 模块。然而,作为调用 ParquetDataset 的结果,您将获得一个 pyarrow.parquet.ParquetDataset 对象。要获取 Pandas DataFrame,您宁愿将.read_pandas().to_pandas() 应用于它:

import pyarrow.parquet as pq
import s3fs
s3 = s3fs.S3FileSystem()

pandas_dataframe = pq.ParquetDataset('s3://your-bucket/', filesystem=s3).read_pandas().to_pandas()

【讨论】:

得到:ValueError:在中间目录中找到文件。有什么想法吗? @Mithril 您的错误消息不幸被截断,所以我看不到提到的目录。不过可以猜测一下。事实上,当分区 parquet 文件存储到 S3 时,它们通常首先写入“_temporary”目录。如果此目录不为空,则表明 S3 位置包含不完整(损坏)的数据。 当你这样做时会发生什么?它是流式传输还是复制到本地?如果我打开一个 10GB 文件 2 倍,它会下载 2 倍吗?如果我打开只有 5GB 本地存储的 10GB 文件,它是流式传输还是全部下载? 当我指定所有镶木地板文件所在的密钥时,我得到ArrowIOError: Invalid Parquet file size is 0 bytes。当我明确指定镶木地板文件时,它可以工作。我使用 s3fs == 0.3.5 和 pyarrow == 0.15.0。 @vak 知道为什么我不能像你一样读取 s3 键中的所有 parquet 文件吗? @VincentClaes 斜杠?【参考方案3】:

不用pyarrow也可以用boto3实现

import boto3
import io
import pandas as pd

# Read the parquet file
buffer = io.BytesIO()
s3 = boto3.resource('s3')
object = s3.Object('bucket_name','key')
object.download_fileobj(buffer)
df = pd.read_parquet(buffer)

print(df.head())

【讨论】:

我收到了AttributeError: 's3.Object' object has no attribute 'download_fileobj' 我认为海报的意思是“不使用 s3fs”,无论如何,如果您需要 BytesIO 缓冲区,这是一个很好的答案。【参考方案4】:

将云上的 parquet 数据读入数据帧的最简单方法可能是使用 dask.dataframe 以这种方式:

import dask.dataframe as dd
df = dd.read_parquet('s3://bucket/path/to/data-*.parq')

dask.dataframe可以read from Google Cloud Storage, Amazon S3, Hadoop file system and more!

【讨论】:

这个解决方案的唯一问题是你不能在集群上分发以备不时之需。 我的问题是读取太多文件时出错。会不会是 dask 无法处理,因此需要求助于其他解决方案? 我的镶木地板文件目录中有 0 字节 _SUCCESS 头文件。尝试在全局中排除它们:dd.read_parquet('s3://bucket/test.parquet/[!_]*'),但结果为IndexError: list index out of range。要么是因为没有匹配,要么是一些 0 大小的文件。【参考方案5】:

谢谢!你的问题实际上告诉了我很多。这就是我现在使用pandas (0.21.1) 的方法,它将调用pyarrowboto3 (1.3.1)。

import boto3
import io
import pandas as pd

# Read single parquet file from S3
def pd_read_s3_parquet(key, bucket, s3_client=None, **args):
    if s3_client is None:
        s3_client = boto3.client('s3')
    obj = s3_client.get_object(Bucket=bucket, Key=key)
    return pd.read_parquet(io.BytesIO(obj['Body'].read()), **args)

# Read multiple parquets from a folder on S3 generated by spark
def pd_read_s3_multiple_parquets(filepath, bucket, s3=None, 
                                 s3_client=None, verbose=False, **args):
    if not filepath.endswith('/'):
        filepath = filepath + '/'  # Add '/' to the end
    if s3_client is None:
        s3_client = boto3.client('s3')
    if s3 is None:
        s3 = boto3.resource('s3')
    s3_keys = [item.key for item in s3.Bucket(bucket).objects.filter(Prefix=filepath)
               if item.key.endswith('.parquet')]
    if not s3_keys:
        print('No parquet found in', bucket, filepath)
    elif verbose:
        print('Load parquets:')
        for p in s3_keys: 
            print(p)
    dfs = [pd_read_s3_parquet(key, bucket=bucket, s3_client=s3_client, **args) 
           for key in s3_keys]
    return pd.concat(dfs, ignore_index=True)

然后你可以从 S3 by 读取文件夹下的多个拼花

df = pd_read_s3_multiple_parquets('path/to/folder', 'my_bucket')

(我猜这个代码可以简化很多。)

【讨论】:

【参考方案6】:

如果您愿意也可以使用AWS Data Wrangler。

import awswrangler as wr

df = wr.s3.read_parquet(path="s3://...")

【讨论】:

我不知道为什么,但是读取数据需要很多时间?仅 1.3 mb 的数据集需要 6 秒? 您的示例需要一个镶木地板文件。你需要设置参数dataset = True来读取parquet文件列表。【参考方案7】:

只要你有正确的包设置

$ pip install pandas==1.1.0 pyarrow==1.0.0 s3fs==0.4.2

和您的 AWS 共享 config and credentials files configured appropriately

您可以立即使用pandas

import pandas as pd

df = pd.read_parquet("s3://bucket/key.parquet")

如果有多个 AWS profiles 你可能还需要设置

$ export AWS_DEFAULT_PROFILE=profile_under_which_the_bucket_is_accessible

这样您就可以访问您的存储桶了。

【讨论】:

谢谢!这绝对应该是公认的答案。 对,新的pandas现在可以直接读取s3路径了。虽然不知何故,它比我尝试时的 io.BytesIO 方法慢得多。

以上是关于如何使用 pyarrow 从 S3 读取镶木地板文件列表作为熊猫数据框?的主要内容,如果未能解决你的问题,请参考以下文章

Hive/Bigsql Pandas 将浮点数转换为整数,使用 pyarrow 将空值转换为镶木地板文件

使用 pyarrow 如何附加到镶木地板文件?

如何使用 Pyarrow 更改镶木地板文件中列的名称?

如何在 python 的 S3 中从 Pandas 数据帧写入镶木地板文件

如何将单个镶木地板文件从 s3 读入 dask 数据帧?

presto 是不是需要配置单元元存储才能从 S3 读取镶木地板文件?