使用pandas的Python多处理并非所有进程都一次运行
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用pandas的Python多处理并非所有进程都一次运行相关的知识,希望对你有一定的参考价值。
我正在读取块中的csv并将块传递给4个进程池。
pool = Pool(processes=4)
chunk_index = 1
for df in pd.read_csv(downloaded_file, chunksize=chunksize, compression='gzip', skipinitialspace=True, encoding='utf-8'):
output_file_name = output_path + merchant['output_file_format'].format(
file_index, chunk_index)
pool.map(wrapper_process, [
(df, transformer, output_file_name)])
chunk_index += 1
有了这段代码,我的理解是它应该向我展示4个连续运行的过程。但是在下面的htop截图中,它始终是2运行。一个是htop命令它自己。这意味着当时只有1个python进程在运行。
从内存使用情况来看,它是12 gb,我认为只有当4个块加载到内存中时才可能提供1个块是2gb几乎
我如何一次性使用处理器。
问题是你误解了地图是如何工作的。来自qazxsw poi:
the doc此方法将迭代器切割为多个块,并将其作为单独的任务提交给进程池。可以通过将chunksize设置为正整数来指定这些块的(近似)大小。
作为iterable,你提供一个只包含一个元素的列表:元组map(func, iterable[, chunksize])
。但是你需要提供一个包含许多元素的iterable。为了使这项工作,您需要先准备列表,然后再将其发送到进程(提示:您可以编写(df, ...)
并让python自己查找核心数)
Pool()
但是现在你遇到的问题是你需要在内存中保存完整的csv数据,这可能没问题,但通常不行。要解决此问题,您可以切换到使用队列:您可以
- 建立一个空队列
- 启动进程并告诉他们从队列中获取项目(开始时仍为空)
- 使用主进程提供队列(并且可能检查队列是否过长,因此内存消耗不会进入屋顶)
- 将
pool = Pool() chunk_index = 1 list = [] for df in pd.read_csv(downloaded_file, chunksize=chunksize, compression='gzip', skipinitialspace=True, encoding='utf-8'): output_file_name = output_path + merchant['output_file_format'].format(file_index, chunk_index) list.append((df, transformer, output_file_name)]) chunk_index += 1 pool.map(wrapper_process, list)
元素放入队列,以便进程自行退出
在STOP
有一个很好的例子,它解释了你会接近这一点。
最后一句话:你确定你的操作受CPU约束吗?你在the official doc (look at the last example on the page)(也可能还有wrapper_process
)做了很多处理吗?因为如果你只是在单独的文件中拆分CSV而没有太多处理,你的程序是IO绑定而不是CPU绑定,那么多处理没有任何意义。
以上是关于使用pandas的Python多处理并非所有进程都一次运行的主要内容,如果未能解决你的问题,请参考以下文章
python中的多处理-在多个进程之间共享大对象(例如pandas数据框)