有效分区 dask 数据帧的策略

Posted

技术标签:

【中文标题】有效分区 dask 数据帧的策略【英文标题】:Strategy for partitioning dask dataframes efficiently 【发布时间】:2017-11-23 06:25:26 【问题描述】:

Dask 的文档谈到了重新分区以减少开销 here。

然而,它们似乎表明您需要事先了解数据框的外观(即预期数据的 1/100)。

有没有一种在不做假设的情况下明智地重新分区的好方法?目前我只是用npartitions = ncores * magic_number 重新分区,并在需要时将force 设置为True 以扩展分区。这种尺寸适合所有方法,但由于我的数据集大小不同,因此绝对不是最佳选择。

数据是时间序列数据,但不幸的是不是定期间隔,我过去曾使用按时间频率重新分区,但由于数据的不规则性,这将不是最佳的(有时几分钟没有,然后几千秒)

【问题讨论】:

这可能是个糟糕的主意——但是在df[df.name == 'Alice'] 上调用len() 会不会太贵?我觉得不应该这样 - 每个工作人员总结他们的数据帧长度,调度程序将其减少到一个总和。然后,一旦你有了这个数字,你就可以创建一个原始高度与当前高度的比率,并通过相应的重新分区来更新分区计数。 我认为调用 len() 会执行整个任务图,这将非常昂贵。 我目前正在尝试的是在过滤后计算()数据帧到熊猫。然后立即使用 .from_pandas 上的 chunksize 参数将其填充回一个 dask 数据帧,该参数设置为我认为合适的值。这仅适用于可以放入内存的数据帧,但确实可以节省使用 len() 进行的昂贵的重新计算 不会在子集查询上使用count() 之类的东西来获取长度,而无需将其作为 Pandas DataFrame 加载到内存中,然后将其重新发送回 Dask?看起来那个操作本身很激烈? 没有自动合理的重新分区方法,尽管可能应该有。我的目标可能是让每个数据帧的大小约为 100MB。您可以致电df.memory_usage().sum().compute() 来帮助确定合适的分区数量。 【参考方案1】:

从Dask 2.0.0 开始,您可以致电.repartition(partition_size="100MB")

此方法执行分区大小的对象考虑 (.memory_usage(deep=True)) 细分。它将加入较小的分区,或拆分已经变得太大的分区。

Dask's Documentation 还概述了用法。

【讨论】:

【参考方案2】:

在与 mrocklin 讨论后,一个不错的分区策略是在df.memory_usage().sum().compute() 的指导下以 100MB 的分区大小为目标。对于适合 RAM 的数据集,这可能涉及的额外工作可以通过使用放置在相关点的df.persist() 来减轻。

【讨论】:

【参考方案3】:

只是为了补充萨曼莎休斯的回答:

memory_usage() 默认忽略对象 dtype 列的内存消耗。对于我最近使用的数据集,这导致低估了大约 10 倍的内存使用量。

除非您确定没有对象 dtype 列,否则我建议指定 deep=True,即重新分区使用:

df.repartition(npartitions= 1+df.memory_usage(deep=True).sum().compute() // n )

n 是您的目标分区大小,以字节为单位。加 1 确保分区数始终大于 1(// 执行楼层划分)。

【讨论】:

你如何选择目标尺寸?估计我应该选择什么尺寸的好方法是什么?

以上是关于有效分区 dask 数据帧的策略的主要内容,如果未能解决你的问题,请参考以下文章

基于列或函数的 Dask 数据帧拆分分区

如何将多个功能应用于dask数据帧的多个块?

使用Dask并行过滤数据帧的块

芹菜任务设置与视频帧的内存缓存作为python中的循环缓冲区策略

数据写入kafka的分区策略

Flink分区策略