基于列或函数的 Dask 数据帧拆分分区

Posted

技术标签:

【中文标题】基于列或函数的 Dask 数据帧拆分分区【英文标题】:Dask dataframe split partitions based on a column or function 【发布时间】:2018-09-07 00:56:50 【问题描述】:

我最近开始在 Dask 中寻找大数据。 我有一个关于有效地并行应用操作的问题。

假设我有一些这样的销售数据:

customerKey productKey transactionKey GrossSales netSales unitVolume volume transactionDate
----------- -------------- ---- --------- - -------- ---------- ------ --------
    20353 189 219548 0.921058 0.921058 1 1 2017-02-01 00:00:00
  2596618 189 215015 0.709997 0.709997 1 1 2017-02-01 00:00:00
 30339435 189 215184 0.918068 0.918068 1 1 2017-02-01 00:00:00
 32714675 189 216656 0.751007 0.751007 1 1 2017-02-01 00:00:00
 39232537 189 218180 0.752392 0.752392 1 1 2017-02-01 00:00:00
 41722826 189 216806 0.0160143 0.0160143 1 1 2017-02-01 00:00:00
 46525123 189 219875 0.469437 0.469437 1 1 2017-02-01 00:00:00
 51024667 189 215457 0.244886 0.244886 1 1 2017-02-01 00:00:00
 52949803 189 215413 0.837739 0.837739 1 1 2017-02-01 00:00:00
 56526281 189 220261 0.464716 0.464716 1 1 2017-02-01 00:00:00
 56776211 189 220017 0.272027 0.272027 1 1 2017-02-01 00:00:00
 58198475 189 215058 0.805758 0.805758 1 1 2017-02-01 00:00:00
 63523098 189 214821 0.479798 0.479798 1 1 2017-02-01 00:00:00
 65987889 189 217484 0.122769 0.122769 1 1 2017-02-01 00:00:00
 74607556 189 220286 0.564133 0.564133 1 1 2017-02-01 00:00:00
 75533379 189 217880 0.164387 0.164387 1 1 2017-02-01 00:00:00
 85676779 189 215150 0.0180961 0.0180961 1 1 2017-02-01 00:00:00
 88072944 189 219071 0.492753 0.492753 1 1 2017-02-01 00:00:00
 90233554 189 216118 0.439582 0.439582 1 1 2017-02-01 00:00:00
 91949008 189 220178 0.1893 0.1893 1 1 2017-02-01 00:00:00
 91995925 189 215159 0.566552 0.566552 1 1 2017-02-01 00:00:00

我想做几个不同的 groupby,首先在 customerKey 上申请 groupby。 然后是 customerKey 上的另一个 groupby-sum,以及将作为 previos groupby 应用结果的列。

我能想到的最有效的方法是将此数据帧拆分为客户密钥块的分区。 因此,例如,我可以使用分区方案将数据帧分成 4 个块,例如(伪代码)

按 customerKey % 4 分区

然后我可以使用 map_partitions 对每个分区应用这些分组,然后最终返回结果。然而,似乎 dask 迫使我为我想做的每个 groupby 做一个随机播放。

有没有办法根据列的值重新分区?

目前,在只有约 80,000 行的数据帧上,4 个工作人员需要约 45 秒。我正计划将其扩展到数万亿行的数据帧,而这似乎已经非常可怕了。

我错过了 Dask 的基本内容吗?

【问题讨论】:

【参考方案1】:

与 groupby 相比,为所需列设置索引和 map_partitions 效率更高

【讨论】:

虽然这可能对 OP 有所帮助,但最好添加更多细节、示例等。请provide answers that don't require clarification from the asker. @43shahin,我也想了解更多详情。【参考方案2】:

您可以将列设置为索引

df = df.set_index('customerKey')

这将按该列对您的数据进行排序,并跟踪哪些值范围位于哪个分区中。正如您所注意到的,这可能是一项昂贵的操作,您可能希望将其保存在某个地方

在内存中

df = df.persist()

或在磁盘上

df.to_parquet('...')
df = df.read_parquet('...')

【讨论】:

啊哈,因此,如果您将索引设置为 customerKey,则可以保证在每个分区中都有独立的 customerKey 块。太酷了,谢谢。将索引设置为列,然后执行 df = df.map_partitions(f).compute() 其中 f 是一个相当大的函数,这是使用 dask 数据帧时的标准做法。对于我正在尝试做的事情,这似乎是最有效的解决方案,但在文档中并没有真正提及。 正确,更多信息请参见dask.pydata.org/en/latest/dataframe-design.html#partitions 将索引设置为列,然后执行 df = df.map_partitions(f).compute() 其中 f 是一个相当大的函数,这是使用 dask 数据帧时的标准做法。对于我正在尝试做的事情,这似乎是最有效的解决方案,但在文档中并没有真正提及 有点常见。您也可以使用 groupby-apply,但考虑到您在上面表达问题的方式,我怀疑 set_index/map_partitions 解决方案对您来说会更自然。 两种方法我都试过了,groupby-apply 比 map_partitions 长十倍左右

以上是关于基于列或函数的 Dask 数据帧拆分分区的主要内容,如果未能解决你的问题,请参考以下文章

dask分布式数据帧上的慢len函数

如何将多个功能应用于dask数据帧的多个块?

npartitions 在 Dask 数据帧中的作用是啥?

使用 Dask 将大于内存的数据帧缓存到本地磁盘

使用 Dask 数据帧的 Autosklearn 预测/ Autosklearn 对 dask 数据帧的支持

Dask map_partitions() 将 `partition_info` 打印为 None