`multiprocessing.Pool.map()` 似乎安排错误

Posted

技术标签:

【中文标题】`multiprocessing.Pool.map()` 似乎安排错误【英文标题】:`multiprocessing.Pool.map()` seems to schedule wrongly 【发布时间】:2019-12-10 08:05:50 【问题描述】:

我有一个请求服务器、检索一些数据、处理它并保存一个 csv 文件的函数。此功能应启动 20k 次。每次执行的持续时间都不同:有时会持续超过 20 分钟,有时会持续不到一秒。我决定使用multiprocessing.Pool.map 来并行执行。我的代码如下:

def get_data_and_process_it(filename):
    print('getting', filename)
    ...
    print(filename, 'has been process')

with Pool(8) as p:
    p.map(get_data_and_process_it, long_list_of_filenames)

看看prints 是如何生成的,似乎long_list_of_filenames 被分成8 个部分并分配给每个CPU,因为有时只是在一个20 分钟的执行中被阻塞,而long_list_of_filenames 没有其他元素在这 20 分钟内处理完毕。我所期待的是map 以 FIFO 样式调度 CPU 内核中的每个元素。

我的情况有更好的方法吗?

【问题讨论】:

在这种情况下,您应该将Pool.map()chunksize 参数设置为1。您可以从我的回答 here 中使用 calc_chunksize_info() 计算否则生成的块大小。 map 的工作方式类似于内置的 map 用于迭代。这意味着订单是确保。换句话说,一个缓慢的进程会阻塞更快的进程。如果订单对您来说不重要,我建议改为查看map_async 【参考方案1】:

map 方法仅在 所有 操作完成后返回。

而且从池工作人员打印并不理想。一方面,像stdout 这样的文件使用缓冲,因此在打印消息和实际出现之间可能存在可变时间。此外,由于所有工作人员都继承了相同的stdout,因此他们的输出会相互交织,甚至可能出现乱码。

所以我建议改用imap_unordered。这将返回一个迭代器,该迭代器将在它们可用时立即开始产生结果。唯一的问题是,这会按照它们完成的顺序返回结果,而不是按照它们开始的顺序。

您的工作函数 (get_data_and_process_it) 应该返回某种状态指示器。例如文件名和结果的元组。

def get_data_and_process_it(filename):
    ...
    if (error):
        return (filename, f'has *failed* bacause of reason')
    return (filename, 'has been processed')

你可以这样做:

with Pool(8) as p:
   for fn, res in p.imap_unordered(get_data_and_process_it, long_list_of_filenames):
       print(fn, res)

这提供了有关作业何时完成的准确信息,并且由于只有父进程写入stdout,因此输出不会出现乱码。

另外,我建议在程序开头的某处使用sys.stdout.reconfigure(line_buffering=True)。这样可以确保在每行输出之后刷新stdout 流。

【讨论】:

【参考方案2】:

map 正在阻塞,您可以使用 p.map_async 代替 p.mapmap 将等待所有这些函数调用完成,以便我们连续看到所有结果。 map_async 以随机顺序完成工作,并且在开始新任务之前不会等待正在进行的任务完成。这是最快的方法。(For more) 还有一个SO thread 详细讨论了mapmap_async

多处理池类为我们处理排队逻辑。它非常适合并行运行网络抓取作业(示例)或任何可以独立分解和分发的作业。如果您需要对队列进行更多控制或需要在多个进程之间共享数据,您可能需要查看Queue 类(For more)。

【讨论】:

以上是关于`multiprocessing.Pool.map()` 似乎安排错误的主要内容,如果未能解决你的问题,请参考以下文章

multiprocessing.Pool.map_async() 的结果是不是以与输入相同的顺序返回?

python multiprocessing pool.map() 等到方法完成

如何从multiprocessing.Pool.map的worker_funtion内部为数组赋值?

multiprocessing.Pool.map引发MemoryError

Multiprocessing.Pool.Map不做任何事情

Python处理图片缩略图