python dask DataFrame,支持(可简单并行化)行吗?

Posted

技术标签:

【中文标题】python dask DataFrame,支持(可简单并行化)行吗?【英文标题】:python dask DataFrame, support for (trivially parallelizable) row apply? 【发布时间】:2015-09-30 10:33:05 【问题描述】:

我最近发现了dask 模块,旨在成为一个易于使用的python 并行处理模块。对我来说最大的卖点是它适用于 pandas。

在阅读了它的手册页之后,我找不到一种方法来完成这个琐碎的并行化任务:

ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply

目前,为了实现这一目标,AFAIK,

ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame

这是一种丑陋的语法,实际上比完全慢

df.apply(func, axis = 1) # for pandas DF row apply

有什么建议吗?

编辑:感谢@MRocklin 提供地图功能。它似乎比普通熊猫应用要慢。这与熊猫 GIL 发布问题有关还是我做错了?

import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)

def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s

s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec

【问题讨论】:

我对@9​​87654329@模块不熟悉。对于多重处理,当我必须逐行处理大数据帧时,python 模块multiprocessing 非常适合我。思路也很简单:使用np.array_split将大数据框拆分为8个,使用multiprocessing同时处理;完成后,使用pd.concat 将它们连接回原始长度。有关完整代码示例的相关帖子,请参阅***.com/questions/30904354/… 谢谢,非常好。多处理模块的问题是您需要有一个命名函数(不是 lambda)并将其放在 name=="main" 块之外。这使得研究代码的结构很糟糕。 如果您只想使用更好的多处理,您可以查看@mike-mckerns 的multiprocess。您也可以尝试使用 dask core 而不是 dask.dataframe 并构建字典或使用类似 github.com/ContinuumIO/dask/pull/408 【参考方案1】:

map_partitions

您可以使用 map_partitions 函数将您的函数应用于数据帧的所有分区。

df.map_partitions(func, columns=...)

请注意,func 一次只会给出数据集的一部分,而不是像 pandas apply 那样的整个数据集(如果你想进行并行处理,你可能不希望这样做。)

map / apply

您可以使用 map 在系列中逐行映射函数

df.mycolumn.map(func)

您可以使用 apply 在数据帧中逐行映射函数

df.apply(func, axis=1)

线程与进程

从 0.6.0 版开始,dask.dataframes 与线程并行化。自定义 Python 函数不会从基于线程的并行性中获得太多好处。你可以试试进程

df = dd.read_csv(...)

df.map_partitions(func, columns=...).compute(scheduler='processes')

但避免apply

但是,在 Pandas 和 Dask 中,您确实应该避免使用自定义 Python 函数 apply。这通常是性能不佳的根源。如果您找到一种方法以矢量化方式进行操作,那么您的 Pandas 代码可能会快 100 倍,并且您根本不需要 dask.dataframe。

考虑numba

对于您的特定问题,您可以考虑numba。这会显着提高您的表现。

In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)

In [4]: %paste
def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s
## -- End pasted text --

In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms

In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)

In [8]: %time _ = s.apply(fast_func)  # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms

In [9]: %time _ = s.apply(fast_func)  # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms

免责声明,我供职于同时生产 numbadask 并雇佣了许多 pandas 开发人员的公司。

【讨论】:

谢谢!我尝试了 map 方法,它似乎比 pandas 应用的要慢。你能评论一下原帖的编辑吗? 我已经经常使用 numba 了!感谢您的工作。我通常做的是DataFrame的每一行都指定一个模拟的配置(复杂/慢函数的参数)。我已经进行了多处理,请留意更好的方法 @MRocklin 关于熊猫的话题有点偏离;我尝试使用 map 而不是 apply 因为我听说它更快,但我不确定为什么它更快。任何澄清或澄清链接将不胜感激。 @BobHaffner 不知道。建议做一个小实验并发布“为什么会这样”风格的 *** 问题。【参考方案2】:

从 v 开始,dask.dataframe.apply 将责任委托给 map_partitions

@insert_meta_param_description(pad=12)
def apply(self, func, convert_dtype=True, meta=no_default, args=(), **kwds):
    """ Parallel version of pandas.Series.apply
    ...
    """
    if meta is no_default:
        msg = ("`meta` is not specified, inferred from partial data. "
               "Please provide `meta` if the result is unexpected.\n"
               "  Before: .apply(func)\n"
               "  After:  .apply(func, meta='x': 'f8', 'y': 'f8') for dataframe result\n"
               "  or:     .apply(func, meta=('x', 'f8'))            for series result")
        warnings.warn(msg)

        meta = _emulate(M.apply, self._meta_nonempty, func,
                        convert_dtype=convert_dtype,
                        args=args, **kwds)

    return map_partitions(M.apply, self, func,
                          convert_dtype, args, meta=meta, **kwds)

【讨论】:

以上是关于python dask DataFrame,支持(可简单并行化)行吗?的主要内容,如果未能解决你的问题,请参考以下文章

如何将Python Dask Dataframes合并到列中?

将Dask包的Pandas DataFrame转换为单个Dask DataFrame

如何将 Dask.DataFrame 转换为 pd.DataFrame?

Dask 如何旋转 DataFrame

如何为 dask.dataframe 指定元数据

懒惰地从 PostgreSQL / Cassandra 创建 Dask DataFrame