python dask to_parquet 占用大量内存
Posted
技术标签:
【中文标题】python dask to_parquet 占用大量内存【英文标题】:python dask to_parquet taking a lot of memory 【发布时间】:2019-12-12 03:34:19 【问题描述】:我正在使用带有 dask 的 python 3 来读取 parquet 文件列表,进行一些处理,然后将其全部放入一个新的联合 parquet 文件中以供以后使用。
该进程使用了太多内存,以至于它似乎在将所有 parquet 文件写入新的 parquet 文件之前尝试将它们读入内存。
我正在使用以下代码
def t(path):
import dask.dataframe as dd
ddf = dd.read_parquet(path)
ddf["file"] = path
return ddf
b = bag.from_sequence(parquet_files)
with ProgressBar():
data = b.map(lambda x: t(x)).\
map(lambda y: dd.to_parquet(y, output_parquet_file, partition_on=["file"], append=True, engine="fastparquet")).\
compute(num_workers=1)
每次使用一个worker时,内存都会爆炸,尤其是在使用更多时。这些文件很大(每个大约 1G),我试图从 csv 文件中读取信息并将它们分成 25M 块,并遇到了同样的问题。
我在这里缺少什么?当迭代过程似乎在这里做正确的事情时,为什么它会尝试将所有内容加载到内存中?我怎样才能使用 dask 操作来做到这一点,而不会炸毁我在那台机器上的 128G 内存?
PS 我尝试使用 pyarrow 引擎,但问题是附加尚未在 dask 中实现。
编辑:尝试了建议的解决方案: 我现在试试这段代码
import dask.dataframe as dd
with ProgressBar():
dfs = [dd.read_parquet(pfile) for pfile in parquet_files]
for i, path in enumerate(parquet_files):
dfs[i]["file"] = path
df = dd.concat(dfs)
df.to_parquet(output_parquet_file)
但内存还是会爆炸(在内存超过 200G 的系统上)
【问题讨论】:
请注意,现在在 dask master 中为 pyarrow 启用了追加 【参考方案1】:在另一个集合的地图中使用 dask 集合方法很奇怪。您可以像这样使用 bag.map
并直接调用 fastaprquet 函数,或者更好(取决于您需要执行的处理),对所有内容使用数据帧 API:
dfs = [dd.read_parquet(pfile, ...) for pfile in parquet_files]
df = dd.concat(dfs)
df.to_parquet(...)
请注意,尽管您尝试附加到单个文件(我认为),但 parquet 格式并没有真正从中受益,您最好让 Dask 为每个分区写入一个文件。
【讨论】:
对不起,不行,看看编辑,内存还在爆炸,没有文件太大内存还... 您可能想尝试使用分布式调度程序(即使在单个节点上)【参考方案2】:dask 支持读取多个 parquet 文件作为分区。 直接调用就好了。
import dask.dataframe as dd
ddf = dd.read_parquet("parquets/*.parquet")
ddf = ddf.map_partitions(lambda df: df*2)
ddf.to_parquet("result.parquet")
【讨论】:
以上是关于python dask to_parquet 占用大量内存的主要内容,如果未能解决你的问题,请参考以下文章
由于 parquet 文件损坏,Impala 无法创建分区表
在 for 循环中将 Python Dask 系列转换为列表或 Dask DataFrame
如何将Python Dask Dataframes合并到列中?