`multiprocessing.Pool.map()` 似乎安排错误
Posted
技术标签:
【中文标题】`multiprocessing.Pool.map()` 似乎安排错误【英文标题】:`multiprocessing.Pool.map()` seems to schedule wrongly 【发布时间】:2019-12-10 08:05:50 【问题描述】:我有一个请求服务器、检索一些数据、处理它并保存一个 csv 文件的函数。此功能应启动 20k 次。每次执行的持续时间都不同:有时会持续超过 20 分钟,有时会持续不到一秒。我决定使用multiprocessing.Pool.map
来并行执行。我的代码如下:
def get_data_and_process_it(filename):
print('getting', filename)
...
print(filename, 'has been process')
with Pool(8) as p:
p.map(get_data_and_process_it, long_list_of_filenames)
看看prints
是如何生成的,似乎long_list_of_filenames
被分成8 个部分并分配给每个CPU
,因为有时只是在一个20 分钟的执行中被阻塞,而long_list_of_filenames
没有其他元素在这 20 分钟内处理完毕。我所期待的是map
以 FIFO 样式调度 CPU 内核中的每个元素。
我的情况有更好的方法吗?
【问题讨论】:
在这种情况下,您应该将Pool.map()
的chunksize
参数设置为1
。您可以从我的回答 here 中使用 calc_chunksize_info()
计算否则生成的块大小。
map
的工作方式类似于内置的 map
用于迭代。这意味着订单是确保。换句话说,一个缓慢的进程会阻塞更快的进程。如果订单对您来说不重要,我建议改为查看map_async
。
【参考方案1】:
map
方法仅在 所有 操作完成后返回。
而且从池工作人员打印并不理想。一方面,像stdout
这样的文件使用缓冲,因此在打印消息和实际出现之间可能存在可变时间。此外,由于所有工作人员都继承了相同的stdout
,因此他们的输出会相互交织,甚至可能出现乱码。
所以我建议改用imap_unordered
。这将返回一个迭代器,该迭代器将在它们可用时立即开始产生结果。唯一的问题是,这会按照它们完成的顺序返回结果,而不是按照它们开始的顺序。
您的工作函数 (get_data_and_process_it
) 应该返回某种状态指示器。例如文件名和结果的元组。
def get_data_and_process_it(filename):
...
if (error):
return (filename, f'has *failed* bacause of reason')
return (filename, 'has been processed')
你可以这样做:
with Pool(8) as p:
for fn, res in p.imap_unordered(get_data_and_process_it, long_list_of_filenames):
print(fn, res)
这提供了有关作业何时完成的准确信息,并且由于只有父进程写入stdout
,因此输出不会出现乱码。
另外,我建议在程序开头的某处使用sys.stdout.reconfigure(line_buffering=True)
。这样可以确保在每行输出之后刷新stdout
流。
【讨论】:
【参考方案2】:map
正在阻塞,您可以使用 p.map_async
代替 p.map
。 map
将等待所有这些函数调用完成,以便我们连续看到所有结果。 map_async
以随机顺序完成工作,并且在开始新任务之前不会等待正在进行的任务完成。这是最快的方法。(For more) 还有一个SO thread 详细讨论了map
和map_async
。
多处理池类为我们处理排队逻辑。它非常适合并行运行网络抓取作业(示例)或任何可以独立分解和分发的作业。如果您需要对队列进行更多控制或需要在多个进程之间共享数据,您可能需要查看Queue
类(For more)。
【讨论】:
以上是关于`multiprocessing.Pool.map()` 似乎安排错误的主要内容,如果未能解决你的问题,请参考以下文章
multiprocessing.Pool.map_async() 的结果是不是以与输入相同的顺序返回?
python multiprocessing pool.map() 等到方法完成
如何从multiprocessing.Pool.map的worker_funtion内部为数组赋值?
multiprocessing.Pool.map引发MemoryError