如何使用 pyarrow 从 S3 读取镶木地板文件列表作为熊猫数据框?
Posted
技术标签:
【中文标题】如何使用 pyarrow 从 S3 读取镶木地板文件列表作为熊猫数据框?【英文标题】:How to read a list of parquet files from S3 as a pandas dataframe using pyarrow? 【发布时间】:2017-12-16 01:08:53 【问题描述】:我有一种使用 boto3
(1.4.4)、pyarrow
(0.4.1) 和 pandas
(0.20.3) 的巧妙方法。
首先,我可以像这样在本地读取单个 parquet 文件:
import pyarrow.parquet as pq
path = 'parquet/part-r-00000-1e638be4-e31f-498a-a359-47d017a0059c.gz.parquet'
table = pq.read_table(path)
df = table.to_pandas()
我也可以像这样在本地读取 parquet 文件的目录:
import pyarrow.parquet as pq
dataset = pq.ParquetDataset('parquet/')
table = dataset.read()
df = table.to_pandas()
两者都像魅力一样工作。现在我想用存储在 S3 存储桶中的文件远程实现相同的目标。我希望这样的事情会起作用:
dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket')
但它没有:
OSError: Passed non-file path: s3n://dsn/to/my/bucket
在彻底阅读pyarrow's documentation 之后,这似乎是不可能的at the moment。所以我想出了以下解决方案:
从 S3 读取单个文件并获取 pandas 数据框:
import io
import boto3
import pyarrow.parquet as pq
buffer = io.BytesIO()
s3 = boto3.resource('s3')
s3_object = s3.Object('bucket-name', 'key/to/parquet/file.gz.parquet')
s3_object.download_fileobj(buffer)
table = pq.read_table(buffer)
df = table.to_pandas()
这里是我从 S3 文件夹路径创建 pandas 数据框的 hacky、未优化的解决方案:
import io
import boto3
import pandas as pd
import pyarrow.parquet as pq
bucket_name = 'bucket-name'
def download_s3_parquet_file(s3, bucket, key):
buffer = io.BytesIO()
s3.Object(bucket, key).download_fileobj(buffer)
return buffer
client = boto3.client('s3')
s3 = boto3.resource('s3')
objects_dict = client.list_objects_v2(Bucket=bucket_name, Prefix='my/folder/prefix')
s3_keys = [item['Key'] for item in objects_dict['Contents'] if item['Key'].endswith('.parquet')]
buffers = [download_s3_parquet_file(s3, bucket_name, key) for key in s3_keys]
dfs = [pq.read_table(buffer).to_pandas() for buffer in buffers]
df = pd.concat(dfs, ignore_index=True)
有没有更好的方法来实现这一点?也许某种使用pyarrow的熊猫连接器?我想避免使用pyspark
,但如果没有其他解决方案,我会接受。
【问题讨论】:
你有没有考虑用 dask 阅读它们?我可以在两行中做同样的事情。 只有在过滤到大小 > 0 的文件后才能使您的 hacky 解决方案工作:dfs = [pq.read_table(buffer).to_pandas() for buffer in buffers if len(buffer.getvalue()) > 0]
。谢谢!不幸的是,这里的答案并不能完全解决这个问题。
【参考方案1】:
您可以使用来自 dask 的 s3fs,它为 s3 实现了文件系统接口。然后你可以像这样使用 ParquetDataset 的文件系统参数:
import s3fs
s3 = s3fs.S3FileSystem()
dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket', filesystem=s3)
【讨论】:
【参考方案2】:您应该使用yjk21 建议的s3fs
模块。然而,作为调用 ParquetDataset 的结果,您将获得一个 pyarrow.parquet.ParquetDataset 对象。要获取 Pandas DataFrame,您宁愿将.read_pandas().to_pandas()
应用于它:
import pyarrow.parquet as pq
import s3fs
s3 = s3fs.S3FileSystem()
pandas_dataframe = pq.ParquetDataset('s3://your-bucket/', filesystem=s3).read_pandas().to_pandas()
【讨论】:
得到:ValueError:在中间目录中找到文件。有什么想法吗? @Mithril 您的错误消息不幸被截断,所以我看不到提到的目录。不过可以猜测一下。事实上,当分区 parquet 文件存储到 S3 时,它们通常首先写入“_temporary”目录。如果此目录不为空,则表明 S3 位置包含不完整(损坏)的数据。 当你这样做时会发生什么?它是流式传输还是复制到本地?如果我打开一个 10GB 文件 2 倍,它会下载 2 倍吗?如果我打开只有 5GB 本地存储的 10GB 文件,它是流式传输还是全部下载? 当我指定所有镶木地板文件所在的密钥时,我得到ArrowIOError: Invalid Parquet file size is 0 bytes
。当我明确指定镶木地板文件时,它可以工作。我使用 s3fs == 0.3.5 和 pyarrow == 0.15.0。 @vak 知道为什么我不能像你一样读取 s3 键中的所有 parquet 文件吗?
@VincentClaes 斜杠?【参考方案3】:
不用pyarrow也可以用boto3实现
import boto3
import io
import pandas as pd
# Read the parquet file
buffer = io.BytesIO()
s3 = boto3.resource('s3')
object = s3.Object('bucket_name','key')
object.download_fileobj(buffer)
df = pd.read_parquet(buffer)
print(df.head())
【讨论】:
我收到了AttributeError: 's3.Object' object has no attribute 'download_fileobj'
。
我认为海报的意思是“不使用 s3fs”,无论如何,如果您需要 BytesIO 缓冲区,这是一个很好的答案。【参考方案4】:
将云上的 parquet 数据读入数据帧的最简单方法可能是使用 dask.dataframe 以这种方式:
import dask.dataframe as dd
df = dd.read_parquet('s3://bucket/path/to/data-*.parq')
dask.dataframe
可以read from Google Cloud Storage, Amazon S3, Hadoop file system and more!
【讨论】:
这个解决方案的唯一问题是你不能在集群上分发以备不时之需。 我的问题是读取太多文件时出错。会不会是 dask 无法处理,因此需要求助于其他解决方案? 我的镶木地板文件目录中有 0 字节 _SUCCESS 头文件。尝试在全局中排除它们:dd.read_parquet('s3://bucket/test.parquet/[!_]*')
,但结果为IndexError: list index out of range
。要么是因为没有匹配,要么是一些 0 大小的文件。【参考方案5】:
谢谢!你的问题实际上告诉了我很多。这就是我现在使用pandas
(0.21.1) 的方法,它将调用pyarrow
和boto3
(1.3.1)。
import boto3
import io
import pandas as pd
# Read single parquet file from S3
def pd_read_s3_parquet(key, bucket, s3_client=None, **args):
if s3_client is None:
s3_client = boto3.client('s3')
obj = s3_client.get_object(Bucket=bucket, Key=key)
return pd.read_parquet(io.BytesIO(obj['Body'].read()), **args)
# Read multiple parquets from a folder on S3 generated by spark
def pd_read_s3_multiple_parquets(filepath, bucket, s3=None,
s3_client=None, verbose=False, **args):
if not filepath.endswith('/'):
filepath = filepath + '/' # Add '/' to the end
if s3_client is None:
s3_client = boto3.client('s3')
if s3 is None:
s3 = boto3.resource('s3')
s3_keys = [item.key for item in s3.Bucket(bucket).objects.filter(Prefix=filepath)
if item.key.endswith('.parquet')]
if not s3_keys:
print('No parquet found in', bucket, filepath)
elif verbose:
print('Load parquets:')
for p in s3_keys:
print(p)
dfs = [pd_read_s3_parquet(key, bucket=bucket, s3_client=s3_client, **args)
for key in s3_keys]
return pd.concat(dfs, ignore_index=True)
然后你可以从 S3 by 读取文件夹下的多个拼花
df = pd_read_s3_multiple_parquets('path/to/folder', 'my_bucket')
(我猜这个代码可以简化很多。)
【讨论】:
【参考方案6】:如果您愿意也可以使用AWS Data Wrangler。
import awswrangler as wr
df = wr.s3.read_parquet(path="s3://...")
【讨论】:
我不知道为什么,但是读取数据需要很多时间?仅 1.3 mb 的数据集需要 6 秒? 您的示例需要一个镶木地板文件。你需要设置参数dataset = True
来读取parquet文件列表。【参考方案7】:
只要你有正确的包设置
$ pip install pandas==1.1.0 pyarrow==1.0.0 s3fs==0.4.2
和您的 AWS 共享 config and credentials files configured appropriately
您可以立即使用pandas
:
import pandas as pd
df = pd.read_parquet("s3://bucket/key.parquet")
如果有多个 AWS profiles 你可能还需要设置
$ export AWS_DEFAULT_PROFILE=profile_under_which_the_bucket_is_accessible
这样您就可以访问您的存储桶了。
【讨论】:
谢谢!这绝对应该是公认的答案。 对,新的pandas现在可以直接读取s3路径了。虽然不知何故,它比我尝试时的io.BytesIO
方法慢得多。以上是关于如何使用 pyarrow 从 S3 读取镶木地板文件列表作为熊猫数据框?的主要内容,如果未能解决你的问题,请参考以下文章
Hive/Bigsql Pandas 将浮点数转换为整数,使用 pyarrow 将空值转换为镶木地板文件