如何有效地将大型数据框拆分为多个拼花文件?

Posted

技术标签:

【中文标题】如何有效地将大型数据框拆分为多个拼花文件?【英文标题】:how to efficiently split a large dataframe into many parquet files? 【发布时间】:2018-11-22 07:26:57 【问题描述】:

考虑以下数据框

import pandas as pd
import numpy as np
import pyarrow.parquet as pq
import pyarrow as pa

idx = pd.date_range('2017-01-01 12:00:00.000', '2017-03-01 12:00:00.000', freq = 'T')

dataframe = pd.DataFrame('numeric_col' : np.random.rand(len(idx)),
                          'string_col' : pd.util.testing.rands_array(8,len(idx)),
                           index = idx)

dataframe
Out[30]: 
                     numeric_col string_col
2017-01-01 12:00:00       0.4069   wWw62tq6
2017-01-01 12:01:00       0.2050   SleB4f6K
2017-01-01 12:02:00       0.5180   cXBvEXdh
2017-01-01 12:03:00       0.3069   r9kYsJQC
2017-01-01 12:04:00       0.3571   F2JjUGgO
2017-01-01 12:05:00       0.3170   8FPC4Pgz
2017-01-01 12:06:00       0.9454   ybeNnZGV
2017-01-01 12:07:00       0.3353   zSLtYPWF
2017-01-01 12:08:00       0.8510   tDZJrdMM
2017-01-01 12:09:00       0.4948   S1Rm2Sqb
2017-01-01 12:10:00       0.0279   TKtmys86
2017-01-01 12:11:00       0.5709   ww0Pe1cf
2017-01-01 12:12:00       0.8274   b07wKPsR
2017-01-01 12:13:00       0.3848   9vKTq3M3
2017-01-01 12:14:00       0.6579   crYxFvlI
2017-01-01 12:15:00       0.6568   yGUnCW6n

我需要将此数据框写入许多 parquet 文件。当然,以下工作:

table = pa.Table.from_pandas(dataframe)
pq.write_table(table, '\\\\mypath\\dataframe.parquet', flavor ='spark')

我的问题是生成的(单个)parquet 文件太大。

我怎样才能有效地(内存方面,速度方面)将写作分割daily parquet 文件中(并保持spark 风格)?这些日常文件将在以后与spark 并行阅读。

谢谢!

【问题讨论】:

【参考方案1】:

根据索引创建一个字符串 columndt 将允许您通过运行写出按日期分区的数据

pq.write_to_dataset(table, root_path='dataset_name', partition_cols=['dt'], flavor ='spark')

答案基于此source(注意,来源错误地将分区参数列为partition_columns

【讨论】:

你的意思是先创建dataframe['dt'] = dataframe.index.date ? 我收到TypeError: __cinit__() got an unexpected keyword argument 'partition_columns' 看起来关键字参数是partition_cols 而不是partition_columns。我更新了答案并确认它在本地工作 @ℕʘʘḆḽḘ 我发现它非常慢。有机会使用 dask 吗? huuum... 我还没有尝试使用我的大数据集。让我做一些测试【参考方案2】:

David 提出的解决方案并不能解决问题,因为它会为每个索引生成一个 parquet 文件。但是这个稍微修改过的版本就可以了

import pandas as pd
import numpy as np
import pyarrow.parquet as pq
import pyarrow as pa
idx = pd.date_range('2017-01-01 12:00:00.000', '2017-03-01 12:00:00.000',
                    freq='T')

df = pd.DataFrame('numeric_col': np.random.rand(len(idx)),
                   'string_col': pd.util.testing.rands_array(8,len(idx)),
                  index = idx)

df["dt"] = df.index
df["dt"] = df["dt"].dt.date
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, root_path='dataset_name', partition_cols=['dt'], 
                    flavor='spark')

【讨论】:

我认为他是在暗示同样的事情。问题是让它与一个非常大的数据框一起工作...... 是的,但在这种情况下,您为每个索引保存了一个文件,现在它是每天的文件。如果您从 1998 年 1 月 1 日开始尝试这个示例(超过 1000 万行),您可以在大约 40 秒内保存它。 很有趣,但这实际上是我对大型数据框所做的事情并最终导致内存问题......我将 dt 设置为 index.date.astype(str) 我认为这是问题所在。一直使用 objectstring 而不是 numeric/datatime,您将使用更多内存。尝试在不更改类型的情况下尝试一下。

以上是关于如何有效地将大型数据框拆分为多个拼花文件?的主要内容,如果未能解决你的问题,请参考以下文章

如何有效地将具有一定周期性的列表拆分为多个列表?

如何有效地将大型 .tsv 文件上传到 pyspark 中具有拆分列的 Hive 表?

根据列将大型 csv 文件拆分为多个文件

在黄瓜 jvm 中,如何正确地将步骤拆分为多个文件?

有效地将数据从 CSV 读取到具有多个分隔符的数据框中

如何在SSIS中将大型Excel文件拆分为多个小文件?