如何使用 pyarrow 和 parquet 保存具有自定义类型的 pandas DataFrame

Posted

技术标签:

【中文标题】如何使用 pyarrow 和 parquet 保存具有自定义类型的 pandas DataFrame【英文标题】:How to save a pandas DataFrame with custom types using pyarrow and parquet 【发布时间】:2020-07-30 22:26:52 【问题描述】:

我想将 pandas DataFrame 保存到镶木地板,但其中有一些不受支持的类型(例如 bson ObjectIds)。

在我们使用的示例中:

import pandas as pd
import pyarrow as pa

这里有一个简单的例子来说明这种情况:

df = pd.DataFrame(
    [
        'name': 'alice', 'oid': ObjectId('5e9992543bfddb58073803e7'),
        'name': 'bob',   'oid': ObjectId('5e9992543bfddb58073803e8'),
    ]
)

df.to_parquet('some_path')

我们得到:

ArrowInvalid: ('Could not convert 5e9992543bfddb58073803e7 with type ObjectId: did not recognize Python value type when inferring an Arrow data type', 'Conversion failed for column oid with type object')

我尝试关注此参考:https://arrow.apache.org/docs/python/extending_types.html

因此我编写了以下类型扩展:

class ObjectIdType(pa.ExtensionType):

    def __init__(self):
        pa.ExtensionType.__init__(self, pa.binary(12), "my_package.objectid")

    def __arrow_ext_serialize__(self):
        # since we don't have a parametrized type, we don't need extra
        # metadata to be deserialized
        return b''

    @classmethod
    def __arrow_ext_deserialize__(self, storage_type, serialized):
        # return an instance of this subclass given the serialized
        # metadata.
        return ObjectId()

并且能够为我的oid 列获得一个有效的 pyarray:

values = df['oid']
storage_array = pa.array(values.map(lambda oid: oid.binary), type=pa.binary(12))
pa.ExtensionArray.from_storage(objectid_type, storage_array)

现在我陷入困境,在互联网上找不到任何好的解决方案,是如何将我的 df 保存到镶木地板,让它解释哪个列需要哪个 Extension。将来我可能会更改列,并且我有几种不同的类型需要这种处理。

如何简单地从数据帧创建 parquet 文件并在透明转换类型的同时恢复它们?

我尝试创建一个pyarrow.Table 对象,并在预处理后将列附加到它,但它不起作用,因为table.append_column 采用二进制列而不是pyarrow.Arrays,加上整个isinstance 看起来像糟糕的解决方案。

table = pa.Table.from_pandas(pd.DataFrame())
for col, values in test_df.iteritems():

    if isinstance(values.iloc[0], ObjectId):
        arr = pa.array(
            values.map(lambda oid: oid.binary), type=pa.binary(12)
        )

    elif isinstance(values.iloc[0], ...):
        ...

    else:
        arr = pa.array(values)

    table.append_column(arr, col)  # FAILS (wrong type)

理想解决方案的伪代码:

parquetize(df, path, my_custom_types_conversions)
# ...
new_df = unparquetize(path, my_custom_types_conversions)

assert df.equals(new_df)  # types have been correctly restored

我迷失在 pyarrow 的文档中,我是否应该使用 ExtensionTypeserialization 或其他东西来编写这些函数。任何指针将不胜感激。

旁注,我根本不需要parquet,主要问题是能够保存和恢复具有自定义类型quicklyspace efficiently 的数据帧。我尝试了一个基于 jsonifying 和 gziping 数据帧的解决方案,但是太慢了。

【问题讨论】:

我可以贡献的是,至少您的扩展类缺少从 pandas 转换的必要功能,to_pandas_dtype,请参阅本节详细信息arrow.apache.org/docs/python/…。不过,我仍然没有找到可行的解决方案(我正在尝试使用 UUID)。 为什么不腌制数据框并存储它?你可以解开它以获得准确的副本。 【参考方案1】:

我认为这可能是因为'ObjectId'不是python中定义的关键字,因此它在类型转换中抛出了这个异常。

我尝试了您提供的示例,并尝试在数据框创建期间将 oid 值转换为字符串类型,并且成功了。

检查以下步骤:

df = pd.DataFrame(
    [
        'name': 'alice', 'oid': "ObjectId('5e9992543bfddb58073803e7')",
        'name': 'bob',   'oid': "ObjectId('5e9992543bfddb58073803e8')",
    ]
)

df.to_parquet('parquet_file.parquet')
df1 = pd.read_parquet('parquet_file.parquet',engine='pyarrow')
df1

输出:

    name    oid
0   alice   ObjectId('5e9992543bfddb58073803e7')
1   bob ObjectId('5e9992543bfddb58073803e8')

【讨论】:

【参考方案2】:

您可以编写一个方法来读取列名和类型并输出一个新的 DF,其中列转换为兼容类型,使用 switch-case 模式来选择将列转换为什么类型(或者是否保持原样)。

【讨论】:

以上是关于如何使用 pyarrow 和 parquet 保存具有自定义类型的 pandas DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

使用 PyArrow + Parquet + Google Cloud Storage 时如何实现谓词下推?

Pyarrow.lib.Schema 与 pyarrow.parquet.Schema

Pandas - 如何将 Parquet 数据帧保存到本地磁盘?

使用谓词过滤 pyarrow.parquet.ParquetDataset 中的行

带有pyarrow内存的dask read_parquet爆炸

pandas to_parquet 在大型数据集上失败