在不加载到内存的情况下将 HDF5 转换为 Parquet

Posted

技术标签:

【中文标题】在不加载到内存的情况下将 HDF5 转换为 Parquet【英文标题】:Converting HDF5 to Parquet without loading into memory 【发布时间】:2018-02-19 19:32:04 【问题描述】:

我有一个以 HDF5 格式存储的大型数据集(约 600 GB)。由于这太大而无法放入内存,我想将其转换为 Parquet 格式并使用 pySpark 执行一些基本的数据预处理(标准化、查找相关矩阵等)。但是,我不确定如何在不将其加载到内存的情况下将整个数据集转换为 Parquet。

我查看了这个要点:https://gist.github.com/jiffyclub/905bf5e8bf17ec59ab8f#file-hdf_to_parquet-py,但似乎整个数据集正在被读入内存。

我想到的一件事是分块读取 HDF5 文件并将其增量保存到 Parquet 文件中:

test_store = pd.HDFStore('/path/to/myHDFfile.h5')
nrows = test_store.get_storer('df').nrows
chunksize = N
for i in range(nrows//chunksize + 1):
    # convert_to_Parquet() ...

但我找不到任何可以让我逐步构建 Parquet 文件的文档。任何进一步阅读的链接将不胜感激。

【问题讨论】:

【参考方案1】:

您可以为此使用pyarrow!

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


def convert_hdf5_to_parquet(h5_file, parquet_file, chunksize=100000):

    stream = pd.read_hdf(h5_file, chunksize=chunksize)

    for i, chunk in enumerate(stream):
        print("Chunk ".format(i))

        if i == 0:
            # Infer schema and open parquet file on first chunk
            parquet_schema = pa.Table.from_pandas(df=chunk).schema
            parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')

        table = pa.Table.from_pandas(chunk, schema=parquet_schema)
        parquet_writer.write_table(table)

    parquet_writer.close()

【讨论】:

请注意,Parquet 数据集设计为包含许多文件。它们不需要包含单个大文件,因此分块方法是一种很好的方法。它可能是 1000 个文件,这很好 试试这个,我得到“KeyError: 'index_level_0'” 尝试在from_pandas 方法中添加“preserve_index=False”,但无济于事。 啊,我还需要在第一次调用 Table.from_pandas 时设置 preserve_index=False,以便正确设置架构!【参考方案2】:

感谢您的回答,我尝试从 CLI 调用以下 py 脚本,但它既没有显示任何错误,也看不到转换后的 parquet 文件。

而且h5文件也不为空。enter image description here

将熊猫导入为 pd 将 pyarrow 导入为 pa 将 pyarrow.parquet 导入为 pq

h5_file = "C:\Users...\tall.h5" parquet_file = "C:\Users...\my.parquet"

def convert_hdf5_to_parquet(h5_file, parquet_file, chunksize=100000):

stream = pd.read_hdf(h5_file, chunksize=chunksize)

for i, chunk in enumerate(stream):
    print("Chunk ".format(i))
    print(chunk.head())

    if i == 0:
        # Infer schema and open parquet file on first chunk
        parquet_schema = pa.Table.from_pandas(df=chunk).schema
        parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')

    table = pa.Table.from_pandas(chunk, schema=parquet_schema)
    parquet_writer.write_table(table)
parquet_writer.close()

【讨论】:

pandas read_hdf 方法需要一个包含单个表的 hdf5 文件。对于在自定义层次结构中包含多个表的 hdf5 文件,您需要编写自定义代码来提取每个表。两个可能对此有用的 Python 包是 h5py 和 PyTables。此外,这应该是一个新问题,而不是对现有问题的回答。 感谢您的回复!但是我的 hdf5 文件包含 n 个表,我不想在我的代码中明确提及所有表。每个文件的“N”个更改。当我尝试使用 h5py 时,我相信 - 它的任务是指定名称。请另外提出建议。让我也开始一个新的线程。

以上是关于在不加载到内存的情况下将 HDF5 转换为 Parquet的主要内容,如果未能解决你的问题,请参考以下文章

在不加载/执行脚本/图像的情况下将 HTML 字符串转换为 HTML DOM?

如何在不先加载到 RAM 的情况下将文件加载到 blob 中?

spark sql 在不使用 where 子句的情况下将所有数据加载到内存中

如何在不使用画布的情况下将整个 div 数据转换为图像并将其保存到目录中?

在不编码的情况下将 NSData 转换为 NSString

在不访问库的情况下将整数转换为字符串