使用谓词过滤 pyarrow.parquet.ParquetDataset 中的行

Posted

技术标签:

【中文标题】使用谓词过滤 pyarrow.parquet.ParquetDataset 中的行【英文标题】:Using predicates to filter rows from pyarrow.parquet.ParquetDataset 【发布时间】:2019-10-24 15:39:51 【问题描述】:

我有一个存储在 s3 上的 parquet 数据集,我想从数据集中查询特定的行。我可以使用 petastorm 做到这一点,但现在我只想使用 pyarrow 做到这一点。

这是我的尝试:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    'analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[('event_name', '=', 'SomeEvent')]
)

df = dataset.read_pandas().to_pandas()

但这会返回一个 pandas DataFrame,就好像过滤器不起作用一样,即我有不同值的行 event_name。有什么我遗漏的东西或我误解的东西吗?我可以在获取 pandas DataFrame 后进行过滤,但我会使用比需要更多的内存空间。

【问题讨论】:

【参考方案1】:

注意:我已在this post 中将其扩展为 Python 和 Parquet 的综合指南

Parquet 格式分区

为了使用过滤器,您需要使用分区以 Parquet 格式存储数据。加载一些 Parquet 列和分区中的一些可以导致 Parquet 与 CSV 相比 I/O 性能的巨大改进。 Parquet 可以根据一个或多个字段的值对文件进行分区,并为嵌套值的唯一组合创建目录树,或者为一个分区列创建一组目录。 PySpark Parquet documentation 解释了 Parquet 的工作原理。

关于性别和国家的分区看起来像this:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...

如果您需要进一步对数据进行分区,还有行组分区,但大多数工具仅支持指定行组大小,您必须自己进行 key-->row group 查找,这很难看(很高兴在另一个问题)。

用 Pandas 写分区

您需要使用 Parquet 对数据进行分区,然后可以使用过滤器加载它。对于大型数据集,您可以使用 PyArrow、pandas 或 Dask 或 PySpark 将数据写入分区。

例如,在 pandas 中写入分区:

df.to_parquet(
    path='analytics.xxx', 
    engine='pyarrow',
    compression='snappy',
    columns=['col1', 'col5'],
    partition_cols=['event_name', 'event_category']
)

这会将文件布置为:

analytics.xxx/event_name=SomeEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=SomeEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet

在 PyArrow 中加载 Parquet 分区

要使用分区列按一个属性抓取事件,您可以在列表中放置一个元组过滤器:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    's3://analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[('event_name', '=', 'SomeEvent')]
)
df = dataset.to_table(
    columns=['col1', 'col5']
).to_pandas()

用逻辑与过滤

要使用 AND 获取具有两个或多个属性的事件,您只需创建一个过滤器元组列表:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    's3://analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[
        ('event_name',     '=', 'SomeEvent'),
        ('event_category', '=', 'SomeCategory')
    ]
)
df = dataset.to_table(
    columns=['col1', 'col5']
).to_pandas()

使用逻辑 OR 过滤

要使用 OR 获取两个事件,您需要将过滤器元组嵌套在它们自己的列表中:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    's3://analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[
        [('event_name', '=', 'SomeEvent')],
        [('event_name', '=', 'OtherEvent')]
    ]
)
df = dataset.to_table(
    columns=['col1', 'col5']
).to_pandas()

使用 AWS Data Wrangler 加载 Parquet 分区

正如另一个答案所提到的,将数据过滤加载到某些分区中的某些列(无论数据位于何处(本地或云中))的最简单方法是使用awswrangler 模块。如果您使用的是 S3,请查看 awswrangler.s3.read_parquet()awswrangler.s3.to_parquet() 的文档。过滤的工作原理与上面的示例相同。

import awswrangler as wr

df = wr.s3.read_parquet(
    path="analytics.xxx",
    columns=["event_name"], 
    filters=[('event_name', '=', 'SomeEvent')]
)

使用pyarrow.parquet.read_table() 加载 Parquet 分区

如果你使用 PyArrow,你也可以使用pyarrow.parquet.read_table()

import pyarrow.parquet as pq

fp = pq.read_table(
    source='analytics.xxx',
    use_threads=True,
    columns=['some_event', 'some_category'],
    filters=[('event_name', '=', 'SomeEvent')]
)
df = fp.to_pandas()

使用 PySpark 加载 Parquet 分区

最后,在 PySpark 中你可以使用pyspark.sql.DataFrameReader.read_parquet()

import pyspark.sql.functions as F
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]") \
                    .appName('Stack Overflow Example Parquet Column Load') \
                    .getOrCreate()

# I automagically employ Parquet structure to load the selected columns and partitions
df = spark.read.parquet('s3://analytics.xxx') \
          .select('event_name', 'event_category') \
          .filter(F.col('event_name') == 'SomeEvent')

希望这可以帮助您使用 Parquet :)

【讨论】:

媒体帖子的链接已损坏 @JasonS 固定:blog.datasyndrome.com/…【参考方案2】:

对于从 Google 访问这里的任何人,您现在可以在读取 Parquet 文件时在 PyArrow 中过滤行。不管你是通过 pandas 还是 pyarrow.parquet 阅读的。

来自documentation:

过滤器 (List[Tuple] or List[List[Tuple]] or None (default)) – 与过滤谓词不匹配的行将从扫描的数据中删除。如果文件不包含匹配的行,嵌入在嵌套目录结构中的分区键将被利用来避免加载文件。如果 use_legacy_dataset 为 True,则过滤器只能引用分区键,并且仅支持 hive 样式的目录结构。将 use_legacy_dataset 设置为 False 时,还支持文件内级别过滤和不同的分区方案。

谓词以析取范式 (DNF) 表示,例如 [[('x', '=', 0), ...], ...]。 DNF 允许单列谓词的任意布尔逻辑组合。最里面的元组每个都描述一个列谓词。内部谓词列表被解释为一个连词 (AND),形成一个更具选择性和多列的谓词。最后,最外层的列表将这些过滤器组合为析取 (OR)。

谓词也可以作为 List[Tuple] 传递。这种形式被解释为单个连词。要在谓词中表达 OR,必须使用(首选)List[List[Tuple]] 表示法。

【讨论】:

【参考方案3】:

对于 python 3.6+,AWS 有一个名为 aws-data-wrangler 的库,它有助于 Pandas/S3/Parquet 之间的集成,它允许您过滤分区的 S3 键。

安装做;

pip install awswrangler

要减少读取的数据,您可以根据存储在 s3 上的 parquet 文件中的分区列过滤行。 要从分区列event_name 中过滤值为"SomeEvent" 的行,请执行;

对于awswrangler

import awswrangler as wr

df = wr.pandas.read_parquet(
         path="s3://my-bucket/my/path/to/parquet-file.parquet",
         columns=["event_name"], 
         filters=[('event_name', '=', 'SomeEvent')]
)

对于awswrangler > 1.0.0 做;

import awswrangler as wr

df = wr.s3.read_parquet(
         path="s3://my-bucket/my/path/to/parquet-file.parquet",
         columns=["event_name"], 
         filters=[('event_name', '=', 'SomeEvent')]
)

【讨论】:

我只是试图利用这个例子,我得到:AttributeError: 'NoneType' object has no attribute 'filter_accepts_partition' 好像有bug; github.com/awslabs/aws-data-wrangler/issues/267 这就是我害怕的。实际上,那是您链接到的我的错误报告。 :) 你现在可以试试旧版本,看看它带来了什么? 看起来这不是一个真正的错误。对我的错误报告的回应是:“不幸的是,AWS Data Wrangler 不支持物理列上的过滤器,仅支持分区上的过滤器。(在上面的提交中更新了文档)。这不仅仅是传递 use_legacy_dataset=False 的问题,似乎新数据集方法不支持接收 boto3 会话。”也许编辑此答案以强调过滤仅适用于分区而不是物理列是一个好主意?【参考方案4】:

目前,filters 功能仅在文件级别实现,尚未在行级别实现。

因此,如果您将数据集作为嵌套层次结构中的多个分区 parquet 文件的集合(此处描述的分区数据集类型:https://arrow.apache.org/docs/python/parquet.html#partitioned-datasets-multiple-files),则可以使用 filters 参数仅读取文件。 但是,您还不能使用它来仅读取单个文件的行组的子集(请参阅https://issues.apache.org/jira/browse/ARROW-1796)。

但是,如果您收到指定此类无效过滤器的错误消息,那就太好了。我为此开了一个问题:https://issues.apache.org/jira/browse/ARROW-5572

【讨论】:

好的,知道了!我应该更多地考虑如何构造数据,以便进行更有效的查询。是的,有错误消息确实很好,感谢您的报告。 您好,您提出的问题似乎已经解决了……但是我测试了代码,仍然没有报错……issues.apache.org/jira/browse/ARROW-5572

以上是关于使用谓词过滤 pyarrow.parquet.ParquetDataset 中的行的主要内容,如果未能解决你的问题,请参考以下文章

使用 Dask 进行 Parquet 谓词下推过滤

使用谓词过滤 NSDictionary

SwiftUI 中的动态过滤器(谓词)

Xcode:使用复杂谓词过滤核心数据集

使用谓词过滤后,我在 tableview 中的 cellForRowAtIndexPath 出现错误

在数组控制器上使用谓词过滤相关对象