让 Pandas DataFrame apply() 使用所有内核?

Posted

技术标签:

【中文标题】让 Pandas DataFrame apply() 使用所有内核?【英文标题】:Make Pandas DataFrame apply() use all cores? 【发布时间】:2018-01-14 15:41:09 【问题描述】:

不幸的是,截至 2017 年 8 月,Pandas DataFame.apply() 仍仅限于使用单核,这意味着当您运行 df.apply(myfunc, axis=1) 时,多核机器将浪费其大部分计算时间。

如何使用所有内核在数据帧上并行运行应用程序?

【问题讨论】:

【参考方案1】:

最简单的方法是使用Dask's map_partitions。你需要这些导入(你需要pip install dask):

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get

语法是

data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def myfunc(x,y,z, ...): return <whatever>

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)  

(如果你有 16 个核心,我认为 30 个分区是合适的)。为了完整起见,我在我的机器(16 核)上计时了差异:

data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)

ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)  
def vectorized(): return myfunc(data['col1'], data['col2']  )

t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))

28.16970546543598

t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))

2.708152851089835

t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))

0.010668013244867325

从 pandas apply 到 dask apply 在分区上提供 10 倍加速。当然,如果你有一个可以向量化的函数,你应该——在这种情况下,函数 (y*(x**2+1)) 被简单地向量化了,但是有很多东西是不可能向量化的。

【讨论】:

很高兴知道,感谢发帖。你能解释一下为什么选择 30 个分区吗?更改此值时性能会发生变化吗? @AndrewL 我假设每个分区都由一个单独的进程提供服务,并且对于 16 个内核,我假设 16 个或 32 个进程可以同时运行。我试了一下,性能似乎提高到 32 个分区,但进一步增加并没有什么好处。我假设对于四核机器,您需要 8 个分区等。请注意,我确实注意到 16 和 32 之间有一些改进,所以我认为您确实需要 2x$NUM_PROCESSORS 只有The get= keyword has been deprecated. Please use the scheduler= keyword instead with the name of the desired scheduler like 'threads' or 'processes' 对于 dask v0.20.0 及更高版本,使用 ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(scheduler='进程'),或其他调度程序选项之一。当前代码抛出“TypeError:get= 关键字已被删除。请使用 scheduler= 关键字而不是所需调度程序的名称,例如 'threads' 或 'processes'” 确保在执行此操作之前,数据框没有重复索引,因为它会抛出 ValueError: cannot reindex from a duplicate axis。要解决这个问题,您应该通过df = df[~df.index.duplicated()] 删除重复索引或通过df.reset_index(inplace=True) 重置索引。【参考方案2】:

您可以使用swifter 包:

pip install swifter

(请注意,您可能希望在 virtualenv 中使用它以避免与已安装的依赖项发生版本冲突。)

Swifter 作为 pandas 的插件,让您可以重用 apply 函数:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

它会自动找出并行化函数的最有效方法,无论它是否被矢量化(如上例所示)。

More examples 和 performance comparison 在 GitHub 上可用。请注意,该软件包正在积极开发中,因此 API 可能会发生变化。

另请注意,此will not work automatically 用于字符串列。当使用字符串时,Swifter 将回退到一个“简单”的 Pandas apply,它不会是并行的。在这种情况下,即使强制它使用 dask 也不会提高性能,最好手动拆分数据集并使用 parallelizing using multiprocessing

【讨论】:

我们纯粹好奇,有没有办法限制它在并行应用时使用的核心数量?我有一个共享服务器,所以如果我抓住所有 32 个核心,没有人会高兴。 @MaximHaytovich 我不知道。 Swifter 在后台使用 dask,所以它可能尊重这些设置:***.com/a/40633117/435093 — 否则我建议在 GitHub 上打开一个问题。作者反应灵敏。 @slhck 谢谢!会多挖一点。无论如何,它似乎无法在 Windows 服务器上运行 - 只是挂起不做任何玩具任务 +1 表示 Swifter。它不仅使用最佳可用方法进行并行化,还通过 tqdm 集成进度条。 对于字符串,只需像这样添加allow_dask_on_strings(enable=True)df.swifter.allow_dask_on_strings(enable=True).apply(some_function) 来源:github.com/jmcarpenter2/swifter/issues/45【参考方案3】:

你可以试试pandarallel:一个简单而高效的工具,可以在你的所有 CPU 上并行化你的 pandas 操作(在 Linux 和 macOS 上)

并行化是有代价的(实例化新进程、通过共享内存发送数据等),因此并行化只有在并行化的计算量足够高时才有效。对于非常少量的数据,使用并行化并不总是值得的。 应用的函数不应是 lambda 函数。
from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)

# ALLOWED
def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

见https://github.com/nalepae/pandarallel

【讨论】:

你好,我无法解决一个问题,使用 pandarallel 时出现错误:AttributeError: Can't pickle local object 'prepare_worker..closure..wrapper' 。你能帮我解决这个问题吗? @Alex Sry 我不是那个模块的开发者。你的代码是什么样的?您可以尝试将“内部函数”声明为 global 吗? (只是猜测) @AlexCam 您的函数应该在其他函数之外定义,以便 python 可以腌制它以进行多处理 @G_KOBELIEF Python >3.6 我们可以将 lambda 函数与 pandaparallel 一起使用【参考方案4】:

如果你想留在原生python:

import multiprocessing as mp

with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(f, df['col'])

将以并行方式将函数 f 应用于数据帧 df 的列 col

【讨论】:

按照这样的方法,我在pandas/core/frame.py 中得到了来自__setitem__ValueError: Length of values does not match length of index。不确定我是否做错了什么,或者分配给df['newcol'] 是否不是线程安全的。 您可以将 pool.map 写入中间 temp_result 列表以允许检查长度是否与 df 匹配,然后执行 df['newcol'] = temp_result? 您的意思是创建新列?你会用什么? 是的,将地图的结果分配给数据框的新列。 map 不会返回发送给函数 f 的每个块的结果列表吗?那么,当您将其分配给“newcol”列时会发生什么?使用 Pandas 和 Python 3 实际上运行起来非常流畅!你试过了吗?它创建一个与 df 长度相同的列表,与发送的顺序相同。它实际上以并行方式执行 c2 = f(c1)。在 python 中没有更简单的多进程方法。就性能而言,Ray 似乎也可以做好事 (towardsdatascience.com/…),但它并不成熟,而且根据我的经验,安装并不总是顺利【参考方案5】:

只想给Dask一个更新答案

import dask.dataframe as dd

def your_func(row):
  #do something
  return row

ddf = dd.from_pandas(df, npartitions=30) # find your own number of partitions
ddf_update = ddf.apply(your_func, axis=1).compute()

在我的 100,000 条记录中,没有 Dask:

CPU时间:用户6分32秒,系统:100毫秒,总计:6分32秒 挂壁时间:6分32秒

与 Dask:

CPU 时间:用户 5.19 秒,系统:784 毫秒,总计:5.98 秒 挂墙时间:1分3秒

【讨论】:

【参考方案6】:

要使用所有(物理或逻辑)内核,您可以尝试使用mapply 替代swifterpandarallel

您可以在初始化时设置核心数量(和分块行为):

import pandas as pd
import mapply

mapply.init(n_workers=-1)

...

df.mapply(myfunc, axis=1)

默认情况下 (n_workers=-1),软件包使用系统上所有可用的物理 CPU。如果您的系统使用超线程(通常会显示两倍的物理 CPU 数量),mapply 将产生一个额外的工作人员来优先处理多处理池而不是系统上的其他进程。

根据您对all your cores 的定义,您也可以改用所有逻辑内核(请注意,像这样受 CPU 限制的进程将争夺物理 CPU,这可能会减慢您的操作速度):

import multiprocessing
n_workers = multiprocessing.cpu_count()

# or more explicit
import psutil
n_workers = psutil.cpu_count(logical=True)

【讨论】:

易于设置!【参考方案7】:

这是一个 sklearn 基础转换器的示例,其中 pandas 应用是并行化的

import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator

class ParllelTransformer(BaseEstimator, TransformerMixin):
    def __init__(self,
                 n_jobs=1):
        """
        n_jobs - parallel jobs to run
        """
        self.variety = variety
        self.user_abbrevs = user_abbrevs
        self.n_jobs = n_jobs
    def fit(self, X, y=None):
        return self
    def transform(self, X, *_):
        X_copy = X.copy()
        cores = mp.cpu_count()
        partitions = 1

        if self.n_jobs <= -1:
            partitions = cores
        elif self.n_jobs <= 0:
            partitions = 1
        else:
            partitions = min(self.n_jobs, cores)

        if partitions == 1:
            # transform sequentially
            return X_copy.apply(self._transform_one)

        # splitting data into batches
        data_split = np.array_split(X_copy, partitions)

        pool = mp.Pool(cores)

        # Here reduce function - concationation of transformed batches
        data = pd.concat(
            pool.map(self._preprocess_part, data_split)
        )

        pool.close()
        pool.join()
        return data
    def _transform_part(self, df_part):
        return df_part.apply(self._transform_one)
    def _transform_one(self, line):
        # some kind of transformations here
        return line

更多信息见https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8

【讨论】:

什么是:self._preprocess_part?我只找到_transform_part【参考方案8】:

这里另一个使用 Joblib 和一些来自 scikit-learn 的帮助代码。轻量级(如果您已经拥有 scikit-learn),如果您希望更好地控制它正在做什么,那就太好了,因为 joblib 很容易被破解。

from joblib import parallel_backend, Parallel, delayed, effective_n_jobs
from sklearn.utils import gen_even_slices
from sklearn.utils.validation import _num_samples


def parallel_apply(df, func, n_jobs= -1, **kwargs):
    """ Pandas apply in parallel using joblib. 
    Uses sklearn.utils to partition input evenly.
    
    Args:
        df: Pandas DataFrame, Series, or any other object that supports slicing and apply.
        func: Callable to apply
        n_jobs: Desired number of workers. Default value -1 means use all available cores.
        **kwargs: Any additional parameters will be supplied to the apply function
        
    Returns:
        Same as for normal Pandas DataFrame.apply()
        
    """
    
    if effective_n_jobs(n_jobs) == 1:
        return df.apply(func, **kwargs)
    else:
        ret = Parallel(n_jobs=n_jobs)(
            delayed(type(df).apply)(df[s], func, **kwargs)
            for s in gen_even_slices(_num_samples(df), effective_n_jobs(n_jobs)))
        return pd.concat(ret)

用法:result = parallel_apply(my_dataframe, my_func)

【讨论】:

【参考方案9】:

由于问题是“如何使用所有内核在数据帧上并行运行应用程序?”,答案也可以是modin。您可以并行运行所有内核,但实时性更差。

见https://github.com/modin-project/modin。它运行在daskray 的顶部。他们说“Modin 是为 1MB 到 1TB+ 的数据集设计的 DataFrame。”我试过了:pip3 install "modin"[ray]"。 Modin vs pandas 是 - 六核 12 秒 vs. 6 秒。

【讨论】:

【参考方案10】:

而不是

df["new"] = df["old"].map(fun)

from joblib import Parallel, delayed
df["new"] = Parallel(n_jobs=-1, verbose=10)(delayed(fun)(i) for i in df["old"])

对我来说这是一个轻微的改进

import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
    df["new"] = pool.map(fun, df["old"])

如果作业非常小,您将获得进度指示和自动批处理。

【讨论】:

【参考方案11】:

本机 Python 解决方案(使用 numpy),可以按照原始问题的要求应用于整个 DataFrame(不仅在单个列上)

import numpy as np
import multiprocessing as mp

dfs = np.array_split(df, 8000) # divide the dataframe as desired

def f_app(df):
    return df.apply(myfunc, axis=1)

with mp.Pool(mp.cpu_count()) as pool:
    res = pd.concat(pool.map(f_app, dfs))

【讨论】:

以上是关于让 Pandas DataFrame apply() 使用所有内核?的主要内容,如果未能解决你的问题,请参考以下文章

pandas使用apply函数:在dataframe数据列(column)上施加(apply)函数

pandas DataFrame apply()函数

pandas 的DataFrame.apply()

pandas DataFrame apply()函数

Python pandas.DataFrame.apply函数方法的使用

pandas编写自定义函数使用apply函数应用自定义函数基于Series数据生成新的dataframe