带有pyarrow内存的dask read_parquet爆炸
Posted
技术标签:
【中文标题】带有pyarrow内存的dask read_parquet爆炸【英文标题】:dask read_parquet with pyarrow memory blow up 【发布时间】:2018-11-25 04:14:28 【问题描述】:我正在使用 dask 来写和读 parquet。我正在使用 fastparquet 引擎写作并使用 pyarrow 引擎阅读。 我的工人有 1 GB 的内存。使用 fastparquet 内存使用情况很好,但是当我切换到 pyarrow 时,它就会爆炸并导致工作人员重新启动。 我在下面有一个可重现的示例,它在 1gb 内存限制的工作人员上使用 pyarrow 失败。 实际上,我的数据集比这大得多。使用 pyarrow 的唯一原因是,与 fastparquet 相比,它在扫描时提高了我的速度(大约 7x-8x)
黎明:0.17.1
pyarrow : 0.9.0.post1
快速镶木地板:0.1.3
import dask.dataframe as dd
import numpy as np
import pandas as pd
size = 9900000
tmpdir = '/tmp/test/outputParquet1'
d = 'a': np.random.normal(0, 0.3, size=size).cumsum() + 50,
'b': np.random.choice(['A', 'B', 'C'], size=size),
'c': np.random.choice(['D', 'E', 'F'], size=size),
'd': np.random.normal(0, 0.4, size=size).cumsum() + 50,
'e': np.random.normal(0, 0.5, size=size).cumsum() + 50,
'f': np.random.normal(0, 0.6, size=size).cumsum() + 50,
'g': np.random.normal(0, 0.7, size=size).cumsum() + 50
df = dd.from_pandas(pd.DataFrame(d), 200)
df.to_parquet(tmpdir, compression='snappy', write_index=True,
engine='fastparquet')
#engine = 'pyarrow' #fails due to worker restart
engine = 'fastparquet' #works fine
df_partitioned = dd.read_parquet(tmpdir + "/*.parquet", engine=engine)
print(df_partitioned.count().compute())
df_partitioned.query("b=='A'").count().compute()
编辑:我的原始设置运行 spark 作业,使用 fastparquet 将数据并行写入分区。因此元数据文件是在最里面的分区而不是父目录中创建的。因此使用 glob 路径而不是父目录(fastparquet 读取父目录时要快得多,而使用 glob 路径扫描时 pyarrow 获胜)
【问题讨论】:
如果你用pyarrow写,你会得到同样的体验吗?请注意,当您使用 fastparquet 编写代码时,您会得到一个元数据文件,因此您可以从tmpdir
读取而无需使用 glob 部分,而且它应该会更快。
元数据文件实际上对我来说是个问题。整个过程是这样工作的:使用 spark 作业并行写入分区。所以元数据文件是在最里面的分区中创建的,因此不能使用父目录来读取所有数据。这就是为什么使用 glob 路径(fastparquet 在父目录加载时更快,但在 glob 路径加载时 pyarrow 胜出)。有没有办法在不使用元数据文件的情况下读取 fastparquet?
我用不同的分区测试了不同的设置,结果如下 1.Fastparquet 使用父目录路径更快(尽管需要元数据文件)。内存使用也很好。读取全局路径时速度较慢。 2.Pyarrow 使用全局路径读取速度更快。由于架构不匹配,父目录读取很困难(列顺序不同,在编写 df 时很容易修复)。确实会导致高内存使用
【参考方案1】:
我建议在read_parquet
调用中选择您需要的列
df = dd.read_parquet('/path/to/*.parquet', engine='pyarrow', columns=['b'])
这将允许您有效地只读取您需要的几列,而不是一次读取所有列。
【讨论】:
我确实需要我存储的所有列。但这是正常的行为吗?我的生产环境的工作节点配置为 5gb 限制。 Fastparquet 最大使用 3gb,而 pyarrow 只是通过它 您可以尝试限制worker的内存或线程数。我不知道箭头的内存占用情况如何确定。数据框有 1.6GB 大,所以实际上您在示例中使用了太多分区。【参考方案2】:我的非内存限制系统上的一些计时结果
使用您的示例数据
In [17]: df_partitioned = dd.read_parquet(tmpdir, engine='fastparquet')
In [18]: %timeit df_partitioned.count().compute()
2.47 s ± 114 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [19]: df_partitioned = dd.read_parquet(tmpdir, engine='pyarrow')
In [20]: %timeit df_partitioned.count().compute()
1.93 s ± 96.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
在写入之前将列 b
和 c
转换为分类
In [30]: df_partitioned = dd.read_parquet(tmpdir, engine='fastparquet')
In [31]: %timeit df_partitioned.count().compute()
1.25 s ± 83.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [32]: df_partitioned = dd.read_parquet(tmpdir, engine='pyarrow')
In [33]: %timeit df_partitioned.count().compute()
1.82 s ± 63 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
使用 fastparquet 直接,单线程
In [36]: %timeit fastparquet.ParquetFile(tmpdir).to_pandas().count()
1.82 s ± 19 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
使用 20 个分区而不是 200 个(fastparquet、类别)
In [42]: %timeit df_partitioned.count().compute()
863 ms ± 78.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
【讨论】:
我的产品数据非常庞大,大小约为 50000000 行,17 列。我不能使用父目录来加载数据(如上所述),因此必须使用 glob 路径读取。【参考方案3】:您还可以在加载数据时进行过滤,例如按特定列
df = dd.read_parquet('/path/to/*.parquet', engine='fastparquet', filters=[(COLUMN, 'operation', 'SOME_VALUE')])
.
想象一下==
、>
、<
等操作。
【讨论】:
以上是关于带有pyarrow内存的dask read_parquet爆炸的主要内容,如果未能解决你的问题,请参考以下文章