pandas 操作过程中的进度指示器

Posted

技术标签:

【中文标题】pandas 操作过程中的进度指示器【英文标题】:Progress indicator during pandas operations 【发布时间】:2013-09-07 08:04:45 【问题描述】:

我经常对超过 1500 万行的数据帧执行 pandas 操作,我希望能够访问特定操作的进度指示器。

pandas split-apply-combine 操作是否存在基于文本的进度指示器?

例如,在类似的情况下:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

feature_rollup 是一个有些复杂的函数,它采用许多 DF 列并通过各种方法创建新的用户列。对于大型数据帧,这些操作可能需要一段时间,所以我想知道是否可以在 iPython 笔记本中提供基于文本的输出,以更新我的进度。

到目前为止,我已经尝试了 Python 的规范循环进度指示器,但它们并没有以任何有意义的方式与 pandas 交互。

我希望我在 pandas 库/文档中忽略了一些东西,可以让人们了解拆分-应用-组合的进度。一个简单的实现可能会查看 apply 函数正在运行的数据帧子集的总数,并将进度报告为这些子集的已完成部分。

这可能需要添加到库中吗?

【问题讨论】:

你对代码做了 %prun (profile) 吗?有时你可以在申请之前对整个框架进行操作以消除瓶颈 @Jeff:你敢打赌,我之前这样做是为了挤出最后一点性能。问题实际上归结为我正在处理的伪 map-reduce 边界,因为行数以千万计,所以我不希望超速提高只是想要一些关于进度的反馈。 考虑 cythonising:pandas.pydata.org/pandas-docs/dev/… @AndyHayden - 正如我对您的回答所评论的那样,您的实施非常好,并且为整体工作增加了少量时间。我还在功能汇总中对三个操作进行了 cythonized,这些操作重新获得了现在专用于报告进度的所有时间。所以最后我敢打赌,如果我在整个函数上使用 cython,我将有进度条并减少总处理时间。 【参考方案1】:

您可以使用装饰器轻松做到这一点

from functools import wraps 

def logging_decorator(func):

    @wraps
    def wrapper(*args, **kwargs):
        wrapper.count += 1
        print "The function I modify has been called 0 times(s).".format(
              wrapper.count)
        func(*args, **kwargs)
    wrapper.count = 0
    return wrapper

modified_function = logging_decorator(feature_rollup)

然后只需使用 modified_function(并在您希望打印时更改)

【讨论】:

明显的警告是这会减慢你的功能!你甚至可以让它随着***.com/questions/5426546/… 的进度更新,例如计数/长度百分比。 是的-您将有订单(组数),因此取决于您的瓶颈是什么,这可能会有所不同 也许直观的做法是将其包装在 logged_apply(g, func) 函数中,您可以在其中访问订单,并且可以从头开始登录。 我在我的回答中做了上述,还有厚颜无耻的百分比更新。实际上我无法让你的工作......我认为是包装位。如果您将其用于申请,那么它并不那么重要。【参考方案2】:

调整 Jeff 的答案(并将其作为可重复使用的功能)。

def logged_apply(g, func, *args, **kwargs):
    step_percentage = 100. / len(g)
    import sys
    sys.stdout.write('apply progress:   0%')
    sys.stdout.flush()

    def logging_decorator(func):
        def wrapper(*args, **kwargs):
            progress = wrapper.count * step_percentage
            sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
            sys.stdout.flush()
            wrapper.count += 1
            return func(*args, **kwargs)
        wrapper.count = 0
        return wrapper

    logged_func = logging_decorator(func)
    res = g.apply(logged_func, *args, **kwargs)
    sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
    sys.stdout.flush()
    return res

注意:应用进度百分比updates inline。如果你的函数标准输出那么这将不起作用。

In [11]: g = df_users.groupby(['userID', 'requestDate'])

In [12]: f = feature_rollup

In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]: 
...

像往常一样,您可以将其作为方法添加到您的 groupby 对象中:

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply

In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]: 
...

正如 cmets 中提到的,这不是核心 pandas 有兴趣实现的功能。但是 python 允许您为许多 pandas 对象/方法创建这些(这样做将是相当多的工作......尽管您应该能够概括这种方法)。

【讨论】:

我说“工作量很大”,但您可能可以将整个函数重写为(更通用的)装饰器。 感谢您扩展 Jeff 的帖子。我已经实现了两者,并且每个的减速都非常小(总共增加了 1.1 分钟到需要 27 分钟才能完成的操作)。这样我可以查看进度,并且考虑到这些操作的临时性质,我认为这是可以接受的减速。 非常好,很高兴它有帮助。我实际上对速度变慢感到惊讶(当我尝试一个示例时),我预计它会更糟。 为了进一步提高发布方法的效率,我对数据导入很懒惰(pandas 太擅长处理混乱的 csv 了!!)和我的一些条目(~1%)完全破坏了插入(想想插入单个字段的整个记录​​)。消除这些会导致功能汇总的大幅加速,因为在拆分-应用-组合操作期间该做什么没有歧义。 我只需要 8 分钟...但我在功能汇总中添加了一些内容(更多功能 -> 更好的 AUC!)。这 8 分钟是每个块(现在总共两个块),每个块在 1200 万行附近。所以是的......使用 HDFStore 对 2400 万行执行大量操作需要 16 分钟(并且在功能汇总中有 nltk 内容)。相当不错。让我们希望互联网不会以最初对混乱插入的无知或矛盾心理来评判我 =)【参考方案3】:

由于大众需求,我在tqdm (pip install "tqdm>=4.9.0") 中添加了pandas 支持。与其他答案不同,这不会明显减慢熊猫的速度——这是DataFrameGroupBy.progress_apply 的示例:

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

# Create new `pandas` methods which use `tqdm` progress
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

如果您对它的工作原理(以及如何针对您自己的回调进行修改)感兴趣,请参阅examples on GitHub、full documentation on PyPI,或导入模块并运行help(tqdm)。其他支持的函数包括mapapplymapaggregatetransform

编辑


直接回答原问题,替换:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

与:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

注意:tqdm : 对于 4.8 以下的 tqdm 版本,您必须这样做,而不是 tqdm.pandas()

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())

【讨论】:

tqdm 实际上最初是为纯可迭代对象创建的:from tqdm import tqdm; for i in tqdm( range(int(1e8)) ): pass pandas 支持是我最近做的一个 hack :) 顺便说一句,如果你使用 Jupyter notebooks,你也可以使用 tqdm_notebooks 来获得更漂亮的 bar。与 pandas 一起,您目前需要像 from tqdm import tqdm_notebook; tqdm_notebook().pandas(*args, **kwargs) see here 一样实例化它 从 4.8.1 版开始 - 使用 tqdm.pandas() 代替。 github.com/tqdm/tqdm/commit/… 谢谢,@mork 是正确的。我们正在(缓慢地)朝着tqdm v5 努力,这使得事情更加模块化。 这太棒了。谢谢【参考方案4】:

我已更改 Jeff's answer,以包含总数,以便您可以跟踪进度和变量以仅打印每 X 次迭代(如果“print_at”相当高,这实际上会大大提高性能)

def count_wrapper(func,total, print_at):

    def wrapper(*args):
        wrapper.count += 1
        if wrapper.count % wrapper.print_at == 0:
            clear_output()
            sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
            sys.stdout.flush()
        return func(*args)
    wrapper.count = 0
    wrapper.total = total
    wrapper.print_at = print_at

    return wrapper

clear_output() 函数来自

from IPython.core.display import clear_output

如果不在 IPython 上,Andy Hayden 的答案就是没有它

【讨论】:

【参考方案5】:

如果您需要支持如何在 Jupyter/ipython 笔记本中使用它,就像我一样,这里有一个有用的指南和来源relevant article:

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)

注意_tqdm_notebook 的导入语句中的下划线。正如引用的文章所述,开发处于后期测试阶段。

更新于 2021 年 11 月 12 日

我目前正在使用pandas==1.3.4tqdm==4.62.3,我不确定哪个版本的tqdm 作者实现了此更改,但不推荐使用上述导入语句。而是使用:

 from tqdm.notebook import tqdm_notebook

自 2022 年 2 月 1 日起更新 现在可以简化 .py 和 .ipynb 文件的导入语句:

from tqdm.auto import tqdm
tqdm.pandas()

这对于两种类型的开发环境都应该可以正常工作,并且应该适用于 pandas 数据帧或其他 tqdm 值得迭代的对象。

【讨论】:

【参考方案6】:

适用于希望将 tqdm 应用于其自定义并行 pandas-apply 代码的任何人。

(多年来我尝试了一些用于并行化的库,但我从未找到 100% 并行化的解决方案,主要是针对 apply 函数,而且我总是不得不回来获取我的“手动”代码。)

df_multi_core - 这是你调用的那个。它接受:

    你的 df 对象 您要调用的函数名 可以对其执行函数的列子集(有助于减少时间/内存) 并行运行的作业数(-1 或省略所有内核) df 函数接受的任何其他 kwargs(如“轴”)

_df_split - 这是一个内部辅助函数,必须全局定位到正在运行的模块(Pool.map 是“位置相关的”),否则我会在内部定位它..

这是来自我的gist 的代码(我将在那里添加更多的 pandas 函数测试):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Bellow 是并行化 apply 的测试代码,其中 tqdm "progress_apply"。

from time import time
from tqdm import tqdm
tqdm.pandas()

if __name__ == '__main__': 
    sep = '-' * 50

    # tqdm progress_apply test      
    def apply_f(row):
        return row['c1'] + 0.1
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame('c1': np.arange(N), 'c2': np.arange(N))

    print('testing pandas apply on \n'.format(df.shape, sep))
    t1 = time()
    res = df.progress_apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation \n'.format(round(t2 - t1, 2), sep))

    t3 = time()
    # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation \n'.format(round(t4 - t3, 2), sep))

在输出中,您可以看到 1 个未并行化运行的进度条,以及并行化运行时的每核进度条。 有一点小问题,有时其他核心会同时出现,但即便如此,我认为它还是有用的,因为您可以获得每个核心的进度统计信息(例如,it/sec 和总记录)

感谢 @abcdaa 提供这么棒的图书馆!

【讨论】:

感谢@mork - 随时添加到github.com/tqdm/tqdm/wiki/How-to-make-a-great-Progress-Bar 或在github.com/tqdm/tqdm/wiki 创建一个新页面 谢谢,但不得不更改这些部分:try: splits = np.array_split(df[subset], njobs) except ValueError: splits = np.array_split(df, njobs) 因为 KeyError 异常而不是 ValueError,更改为 Exception 以处理所有情况。 谢谢@mork - 这个答案应该更高。【参考方案7】:

对于mergeconcatjoin 等操作,可以使用 Dask 显示进度条。

您可以将 Pandas DataFrames 转换为 Dask DataFrames。然后就可以显示 Dask 进度条了。

下面的代码是一个简单的例子:

创建和转换 Pandas 数据帧

import pandas as pd
import numpy as np
from tqdm import tqdm
import dask.dataframe as dd

n = 450000
maxa = 700

df1 = pd.DataFrame('lkey': np.random.randint(0, maxa, n),'lvalue': np.random.randint(0,int(1e8),n))
df2 = pd.DataFrame('rkey': np.random.randint(0, maxa, n),'rvalue': np.random.randint(0, int(1e8),n))

sd1 = dd.from_pandas(df1, npartitions=3)
sd2 = dd.from_pandas(df2, npartitions=3)

与进度条合并

from tqdm.dask import TqdmCallback
from dask.diagnostics import ProgressBar
ProgressBar().register()

with TqdmCallback(desc="compute"):
    sd1.merge(sd2, left_on='lkey', right_on='rkey').compute()

对于相同的操作,Dask 比 Pandas 更快并且需要更少的资源:

熊猫74.7 ms 达斯克20.2 ms

更多详情:

Progress Bar for Merge Or Concat Operation With tqdm in Pandas Test Notebook

注意 1:我已经测试过这个解决方案:https://***.com/a/56257514/3921758,但它对我不起作用。不测量合并操作。

注意 2:我已经检查了 tqdm 的“开放请求”,用于 Pandas,例如:

https://github.com/tqdm/tqdm/issues/1144 https://github.com/noamraph/tqdm/issues/28

【讨论】:

【参考方案8】:

这里的每个答案都使用了pandas.DataFrame.groupby。如果您想在没有 groupby 的情况下在 pandas.Series.apply 上显示进度条,可以在 jupyter-notebook 中执行以下操作:

from tqdm.notebook import tqdm
tqdm.pandas()


df['<applied-col-name>'] = df['<col-name>'].progress_apply(<your-manipulation-function>)

【讨论】:

我必须为任何想尝试此解决方案的人添加此内容:您将需要(tqdm 版本:tqdm&gt;=4.61.2)否则,它将无法正常工作。另外,安装新版本的 tqdm 后一定要重启jupyternotebook 的内核。 (例如,我使用了pip install tqdm==4.62.3【参考方案9】:

对于 concat 操作:

df = pd.concat(
    [
        get_data(f)
        for f in tqdm(files, total=len(files))
    ]
)

tqdm 只返回一个可迭代对象。

【讨论】:

以上是关于pandas 操作过程中的进度指示器的主要内容,如果未能解决你的问题,请参考以下文章

shell脚本中的进度指示器

Javafx - 从 java 类更新 UI 中的进度指示器

纱线进度指示器代表啥?

使用隐藏在全景控件中的进度指示器会导致全景项目只有第一个

在 Spark Azure Databricks 中创建自定义进度条指示器

赵雅智_android多线程下载带进度条