如何在 python 中使用 pyarrow 从 S3 读取分区镶木地板文件
Posted
技术标签:
【中文标题】如何在 python 中使用 pyarrow 从 S3 读取分区镶木地板文件【英文标题】:How to read partitioned parquet files from S3 using pyarrow in python 【发布时间】:2017-12-18 08:47:31 【问题描述】:我正在寻找使用 python 从 s3 中读取多个分区目录中的数据的方法。
data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet data_folder/serial_number=2/cur_date=27-12-2012/asdsdfsd0324324.snappy.parquet
pyarrow 的 ParquetDataset 模块具有从分区读取的能力。所以我尝试了以下代码:
>>> import pandas as pd
>>> import pyarrow.parquet as pq
>>> import s3fs
>>> a = "s3://my_bucker/path/to/data_folder/"
>>> dataset = pq.ParquetDataset(a)
它抛出了以下错误:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
self.metadata_path) = _make_manifest(path_or_paths, self.fs)
File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 601, in _make_manifest
.format(path))
OSError: Passed non-file path: s3://my_bucker/path/to/data_folder/
根据我尝试使用 s3fs 作为文件系统的 pyarrow 文档,即:
>>> dataset = pq.ParquetDataset(a,filesystem=s3fs)
这会引发以下错误:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
self.metadata_path) = _make_manifest(path_or_paths, self.fs)
File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 583, in _make_manifest
if is_string(path_or_paths) and fs.isdir(path_or_paths):
AttributeError: module 's3fs' has no attribute 'isdir'
我只能使用 ECS 集群,因此 spark/pyspark 不是一个选项。
有没有一种方法可以让我们在 python 中从 s3 中的此类分区目录轻松读取镶木地板文件?我觉得列出所有目录然后阅读并不是一个好习惯,正如link 中所建议的那样。我需要将读取的数据转换为 pandas 数据帧以进行进一步处理,因此更喜欢与 fastparquet 或 pyarrow 相关的选项。我也对 python 中的其他选项持开放态度。
【问题讨论】:
让我们在issues.apache.org/jira/browse/ARROW-1213 和issues.apache.org/jira/browse/ARROW-1119 中讨论。我们必须添加一些代码以允许 pyarrow 识别 s3fs 文件系统并添加一个 shim / 兼容性类以使 S3FS 的文件系统 API 与 pyarrow 的略有不同。 【参考方案1】:我设法使用最新版本的 fastparquet 和 s3fs 来解决这个问题。以下是相同的代码:
import s3fs
import fastparquet as fp
s3 = s3fs.S3FileSystem()
fs = s3fs.core.S3FileSystem()
#mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet
s3_path = "mybucket/data_folder/*/*/*.parquet"
all_paths_from_s3 = fs.glob(path=s3_path)
myopen = s3.open
#use s3fs as the filesystem
fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
#convert to pandas dataframe
df = fp_obj.to_pandas()
感谢马丁通过我们的conversation 为我指明了正确的方向
NB :这会比使用 pyarrow 慢,基于 benchmark 。一旦通过ARROW-1213在pyarrow中实现s3fs支持,我将更新我的答案
我使用 pyarrow 对单个迭代进行了快速基准测试,并将文件列表作为 glob 发送到 fastparquet。使用 s3fs 与 pyarrow + 我的 hackish 代码相比,fastparquet 更快。但我认为 pyarrow +s3fs 实施后会更快。
代码和基准如下:
>>> def test_pq():
... for current_file in list_parquet_files:
... f = fs.open(current_file)
... df = pq.read_table(f).to_pandas()
... # following code is to extract the serial_number & cur_date values so that we can add them to the dataframe
... #probably not the best way to split :)
... elements_list=current_file.split('/')
... for item in elements_list:
... if item.find(date_partition) != -1:
... current_date = item.split('=')[1]
... elif item.find(dma_partition) != -1:
... current_dma = item.split('=')[1]
... df['serial_number'] = current_dma
... df['cur_date'] = current_date
... list_.append(df)
... frame = pd.concat(list_)
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
12.078817503992468
>>> def test_fp():
... fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
... df = fp_obj.to_pandas()
>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.961556333000317
2019 年更新
在所有 PR 之后,Arrow-2038 & Fast Parquet - PR#182 等问题都已解决。
使用 Pyarrow 读取 parquet 文件
# pip install pyarrow
# pip install s3fs
>>> import s3fs
>>> import pyarrow.parquet as pq
>>> fs = s3fs.S3FileSystem()
>>> bucket = 'your-bucket-name'
>>> path = 'directory_name' #if its a directory omit the traling /
>>> bucket_uri = f's3://bucket/path'
's3://your-bucket-name/directory_name'
>>> dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
>>> table = dataset.read()
>>> df = table.to_pandas()
使用 Fast parquet 读取 parquet 文件
# pip install s3fs
# pip install fastparquet
>>> import s3fs
>>> import fastparquet as fp
>>> bucket = 'your-bucket-name'
>>> path = 'directory_name'
>>> root_dir_path = f'bucket/path'
# the first two wild card represents the 1st,2nd column partitions columns of your data & so forth
>>> s3_path = f"root_dir_path/*/*/*.parquet"
>>> all_paths_from_s3 = fs.glob(path=s3_path)
>>> fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
>>> df = fp_obj.to_pandas()
快速基准测试
这可能不是对其进行基准测试的最佳方法。请阅读blog post 以获得完整的基准
#pyarrow
>>> import timeit
>>> def test_pq():
... dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
... table = dataset.read()
... df = table.to_pandas()
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
1.2677053569998407
#fastparquet
>>> def test_fp():
... fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
... df = fp_obj.to_pandas()
>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.931876824000028
进一步阅读 Pyarrow 的 speed
参考:
fastparquet s3fs pyarrow pyarrow 箭头代码基于discussion 和文档 fastparquet 代码基于讨论 PR-182、PR-182 以及文档【讨论】:
感谢您的透彻分析;现在 ARROW-1213 已经解决了,你有什么新的基准可以分享吗?谢谢。 我认为在 ARROW-1213 解决后,这里还有一些 bug 需要解决。请参阅issues.apache.org/jira/browse/ARROW-2038。同时,如果您需要使用 pyarrow,我们可以使用 github.com/apache/arrow/pull/916#issuecomment-337619158 中提到的内容 @TodorMinakov 我也更新了答案和基准timeit.timeit('test_fp',number =10,globals=globals())
是否真的调用了test_fp
函数?应该是timeit.timeit('test_fp()',number=10,globals=globals())
?
嗨@DarrenWeber 很好。谢谢!我可能永远不应该在午夜 12 点之后再编码。我已经更新了答案,现在使用更正的代码。【参考方案2】:
对于 python 3.6+,AWS 有一个名为 aws-data-wrangler 的库,它有助于 Pandas/S3/Parquet 之间的集成
安装做;
pip install awswrangler
要使用 awswrangler 1.x.x
及更高版本从 s3 读取分区镶木地板,请执行;
import awswrangler as wr
df = wr.s3.read_parquet(path="s3://my_bucket/path/to/data_folder/", dataset=True)
通过设置 dataset=True
awswrangler 需要分区拼花文件。它将从您在path
中指定的 s3 键下的分区中读取所有单独的 parquet 文件。
【讨论】:
@Vincent_Claes 谢谢你。你如何指定你只想要加载某个分区?您如何应用该过滤器?图书馆会帮你做吗? @rjurney awswrangler 支持对分区进行过滤。你可以在这里找到一些例子:github.com/awslabs/aws-data-wrangler/blob/master/tutorials/… 谢谢!这对我来说就像一个魅力! keys在哪里设置,写数据到private s3 bucket需要keys。 @2015evanotes 您的意思是 KMS 密钥吗?如果是这样,这个答案可以帮助***.com/a/59713720/1771155【参考方案3】:对于那些只想读取分区 parquet 文件的 部分 的人,pyarrow 接受键列表以及部分目录路径以读取分区的所有部分。此方法对于已将 parquet 数据集按有意义的方式(例如按年份或国家/地区)进行分区的组织特别有用,允许用户指定他们需要文件的哪些部分。从长远来看,这将降低成本,因为 AWS 在读取数据集时按字节收费。
# Read in user specified partitions of a partitioned parquet file
import s3fs
import pyarrow.parquet as pq
s3 = s3fs.S3FileSystem()
keys = ['keyname/blah_blah/part-00000-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
,'keyname/blah_blah/part-00001-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
,'keyname/blah_blah/part-00002-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
,'keyname/blah_blah/part-00003-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet']
bucket = 'bucket_yada_yada_yada'
# Add s3 prefix and bucket name to all keys in list
parq_list=[]
for key in keys:
parq_list.append('s3://'+bucket+'/'+key)
# Create your dataframe
df = pq.ParquetDataset(parq_list, filesystem=s3).read_pandas(columns=['Var1','Var2','Var3']).to_pandas()
【讨论】:
这是上面其他例子中唯一有效的例子 如何在更高级别指定分区?在您的示例键名或 blah_blah【参考方案4】:此问题已于 2017 年在 this pull request 中解决。
对于那些只想使用 pyarrow 从 S3 读取 parquet 的人,这里有一个示例:
import s3fs
import pyarrow.parquet as pq
fs = s3fs.S3FileSystem()
bucket = "your-bucket"
path = "your-path"
# Python 3.6 or later
p_dataset = pq.ParquetDataset(
f"s3://bucket/path",
filesystem=fs
)
df = p_dataset.read().to_pandas()
# Pre-python 3.6
p_dataset = pq.ParquetDataset(
"s3://0/1".format(bucket, path),
filesystem=fs
)
df = p_dataset.read().to_pandas()
【讨论】:
但我认为还有一些问题需要解决。请参阅:issues.apache.org/jira/browse/ARROW-2038 我不认为这会禁止任何人使用我上面写的代码来做提问者所要求的事情。该讨论与使用上述方法从 S3 读取镶木地板有何直接关系? 我没有说你的代码不起作用。我的意思是,根据github.com/apache/arrow/pull/916#issuecomment-360541307,还有一些问题需要解决。据我了解,我想已经错过了一个边缘案例。因此,在解决 ARROW-2038 之前,最好使用 fastparquet 而不是 Arrow。 @efbbrown 你为此修复尝试了什么 s3fs 和 pyarrow 版本以上是关于如何在 python 中使用 pyarrow 从 S3 读取分区镶木地板文件的主要内容,如果未能解决你的问题,请参考以下文章
使用 PyArrow + Parquet + Google Cloud Storage 时如何实现谓词下推?
无法在 OSX / Python 3.9 上安装 pyarrow:这是我还是不兼容的包?
使用 pyarrow 的 Python 错误 - ArrowNotImplementedError:未构建对编解码器“snappy”的支持