使用pandas的Python多处理并非所有进程都一次运行

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用pandas的Python多处理并非所有进程都一次运行相关的知识,希望对你有一定的参考价值。

enter image description here

我正在读取块中的csv并将块传递给4个进程池。

pool = Pool(processes=4)
            chunk_index = 1
            for df in pd.read_csv(downloaded_file, chunksize=chunksize, compression='gzip', skipinitialspace=True, encoding='utf-8'):
                output_file_name = output_path + merchant['output_file_format'].format(
                    file_index, chunk_index)
                pool.map(wrapper_process, [
                         (df, transformer, output_file_name)])
                chunk_index += 1

有了这段代码,我的理解是它应该向我展示4个连续运行的过程。但是在下面的htop截图中,它始终是2运行。一个是htop命令它自己。这意味着当时只有1个python进程在运行。

enter image description here从内存使用情况来看,它是12 gb,我认为只有当4个块加载到内存中时才可能提供1个块是2gb几乎

我如何一次性使用处理器。

答案

问题是你误解了地图是如何工作的。来自qazxsw poi:

the doc此方法将迭代器切割为多个块,并将其作为单独的任务提交给进程池。可以通过将chunksize设置为正整数来指定这些块的(近似)大小。

作为iterable,你提供一个只包含一个元素的列表:元组map(func, iterable[, chunksize])。但是你需要提供一个包含许多元素的iterable。为了使这项工作,您需要先准备列表,然后再将其发送到进程(提示:您可以编写(df, ...)并让python自己查找核心数)

Pool()

但是现在你遇到的问题是你需要在内存中保存完整的csv数据,这可能没问题,但通常不行。要解决此问题,您可以切换到使用队列:您可以

  • 建立一个空队列
  • 启动进程并告诉他们从队列中获取项目(开始时仍为空)
  • 使用主进程提供队列(并且可能检查队列是否过长,因此内存消耗不会进入屋顶)
  • pool = Pool() chunk_index = 1 list = [] for df in pd.read_csv(downloaded_file, chunksize=chunksize, compression='gzip', skipinitialspace=True, encoding='utf-8'): output_file_name = output_path + merchant['output_file_format'].format(file_index, chunk_index) list.append((df, transformer, output_file_name)]) chunk_index += 1 pool.map(wrapper_process, list) 元素放入队列,以便进程自行退出

STOP有一个很好的例子,它解释了你会接近这一点。

最后一句话:你确定你的操作受CPU约束吗?你在the official doc (look at the last example on the page)(也可能还有wrapper_process)做了很多处理吗?因为如果你只是在单独的文件中拆分CSV而没有太多处理,你的程序是IO绑定而不是CPU绑定,那么多处理没有任何意义。

以上是关于使用pandas的Python多处理并非所有进程都一次运行的主要内容,如果未能解决你的问题,请参考以下文章

python中的多处理-在多个进程之间共享大对象(例如pandas数据框)

Pandas 多进程处理数据,速度的确快了很多

Python多进程处理数据

Pandas DataFrame.to_sql() 错误 - 在字符串格式化期间并非所有参数都转换

Python多进程处理数据

python 并发编程