使用 pyarrow 从分区拼花数据集中读取特定分区

Posted

技术标签:

【中文标题】使用 pyarrow 从分区拼花数据集中读取特定分区【英文标题】:Reading specific partitions from a partitioned parquet dataset with pyarrow 【发布时间】:2018-06-08 16:21:08 【问题描述】:

我有一个较大的(~20 GB)parquet 格式的分区数据集。我想使用pyarrow 从数据集中读取特定分区。我以为我可以使用pyarrow.parquet.ParquetDataset 完成此操作,但似乎并非如此。这是一个小例子来说明我想要什么。

创建随机数据集:

from collections import OrderedDict
from itertools import product, chain
from uuid import uuid4
import os
from glob import glob

import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow.parquet import ParquetWriter, ParquetDataset


def get_partitions(basepath, partitions):
    """Generate directory hierarchy for a paritioned dataset

    data
    ├── part1=foo
    │   └── part2=True
    ├── part1=foo
    │   └── part2=False
    ├── part1=bar
    │   └── part2=True
    └── part1=bar
        └── part2=False

    """
    path_tmpl = '/'.join(['='] * len(partitions))  # part=value
    path_tmpl = '/'.format(basepath, path_tmpl)    # part1=val/part2=val

    parts = [product([part], vals) for part, vals in partitions.items()]
    parts = [i for i in product(*parts)]
    return [path_tmpl.format(*tuple(chain.from_iterable(i))) for i in parts]


partitions = OrderedDict(part1=['foo', 'bar'], part2=[True, False])
parts = get_partitions('data', partitions)
for part in parts:
    # 3 columns, 5 rows
    data = [pa.array(np.random.rand(5)) for i in range(3)]
    table = pa.Table.from_arrays(data, ['a', 'b', 'c'])
    os.makedirs(part, exist_ok=True)
    out = ParquetWriter('/.parquet'.format(part, uuid4()),
                        table.schema, flavor='spark')
    out.write_table(table)
    out.close()

我想读取分区 1 的所有值,仅读取分区 2 的 True。使用 pandas.read_parquet,这是不可能的,我必须始终读取整个列。我用pyarrow 尝试了以下操作:

parts2 = OrderedDict(part1=['foo', 'bar'], part2=[True])
parts2 = get_partitions('data', parts2)
files = [glob('/*'.format(dirpath)) for dirpath in parts2]
files = [i for i in chain.from_iterable(files)]
df2 = ParquetDataset(files).read().to_pandas()

这也不行:

>>> df2.columns
Index(['a', 'b', 'c'], dtype='object')

我可以像这样在pyspark 中轻松做到这一点:

def get_spark_session_ctx(appName):
    """Get or create a Spark Session, and the underlying Context."""
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName(appName).getOrCreate()
    sc = spark.sparkContext
    return (spark, sc)


spark, sc = get_spark_session_ctx('test')
spark_df = spark.read.option('basePath', 'data').parquet(*parts2)
df3 = spark_df.toPandas()

如下所示:

>>> df3.columns
Index(['a', 'b', 'c', 'part1', 'part2'], dtype='object')

这可以用pyarrowpandas 完成,还是我需要一些自定义实现?

更新:应 Wes 的要求,现在在 JIRA。

【问题讨论】:

【参考方案1】:

从 pyarrow 0.10.0 版开始,您可以使用filters kwarg 进行查询。在您的情况下,它看起来像这样:

import pyarrow.parquet as pq
dataset = pq.ParquetDataset('path-to-your-dataset', filters=[('part2', '=', 'True'),])
table = dataset.read()

Ref

【讨论】:

我知道,但不幸的是,这是部分支持(仅用于读取)。写入支持推迟到 0.12.0。我相信这在 JIRA 中有所涵盖。 非常感谢本文档。我试图让ParquetDatasetPiece 工作,但这是我能工作的唯一解决方案。 in 过滤器中的运算符不适用于pq.ParquetDataset 的我(v 10.0.0),但它适用于pq.read_table()。以防有人遇到类似问题。【参考方案2】:

问题:如何使用 pyarrow 从分区 parquet 数据集中读取特定分区?

回答:你现在不能。

您能否在https://issues.apache.org/jira 上创建一个请求此功能的 Apache Arrow JIRA?

这是我们应该能够在 pyarrow API 中支持的东西,但它需要有人来实现它。谢谢

【讨论】:

我会这样做的,谢谢。我想我应该为 red 和 write 创建功能请求(除非我错过了)。这些天我有很多空闲时间,如果有人指导我,我也可以参与实施。 @wes pyarrow 在 HDFS 中读取分区 parquet 表的进展如何?看来 pyarrow HDFS API 仍然无法为 pq.ParquetDataset() 方法提供数据集对象。

以上是关于使用 pyarrow 从分区拼花数据集中读取特定分区的主要内容,如果未能解决你的问题,请参考以下文章

如何在 python 中使用 pyarrow 从 S3 读取分区镶木地板文件

在 AzureML 上保存分区拼花

使用谓词过滤 pyarrow.parquet.ParquetDataset 中的行

将 PubSub 流保存到 GCS 中的分区拼花文件

如何使用 pyarrow 从 S3 读取镶木地板文件列表作为熊猫数据框?

从 Impala 分区拼花表创建文本表