使用 ProcessPoolExecutor 进行并行处理
Posted
技术标签:
【中文标题】使用 ProcessPoolExecutor 进行并行处理【英文标题】:Parallel processing with ProcessPoolExecutor 【发布时间】:2018-04-02 12:01:01 【问题描述】:我有大量必须以某种方式处理的元素列表。 我知道它可以通过多处理处理来完成:
pr1 = Process(calculation_function, (args, ))
pr1.start()
pr1.join()
所以我可以创建 10 个进程并将除以 10 的参数传递给 args。然后工作就完成了。
但我不想手动创建它并手动计算它。相反,我想使用ProcessPoolExecutor,我这样做是这样的:
executor = ProcessPoolExecutor(max_workers=10)
executor.map(calculation, (list_to_process,))
计算是我完成这项工作的功能。
def calculation(list_to_process):
for element in list_to_process:
# .... doing the job
list_to_process 是我要处理的列表。
但是在运行这段代码之后,循环迭代只进行了一次。 我以为
executor = ProcessPoolExecutor(max_workers=10)
executor.map(calculation, (list_to_process,))
和这个一样10次:
pr1 = Process(calculation, (list_to_process, ))
pr1.start()
pr1.join()
但这似乎是错误的。
ProcessPoolExecutor如何实现真正的多处理?
【问题讨论】:
【参考方案1】:从calculation
函数中删除for
循环。现在您正在使用ProcessPoolExecutor.map
,map()
调用是您的循环,不同之处在于列表中的每个元素都被发送到不同的进程。例如
def calculation(item):
print('[pid:%s] performing calculation on %s' % (os.getpid(), item))
time.sleep(5)
print('[pid:%s] done!' % os.getpid())
return item ** 2
executor = ProcessPoolExecutor(max_workers=5)
list_to_process = range(10)
result = executor.map(calculation, list_to_process)
您会在终端中看到如下内容:
[pid:23988] performing calculation on 0
[pid:10360] performing calculation on 1
[pid:13348] performing calculation on 2
[pid:24032] performing calculation on 3
[pid:18028] performing calculation on 4
[pid:23988] done!
[pid:23988] performing calculation on 5
[pid:10360] done!
[pid:13348] done!
[pid:10360] performing calculation on 6
[pid:13348] performing calculation on 7
[pid:18028] done!
[pid:24032] done!
[pid:18028] performing calculation on 8
[pid:24032] performing calculation on 9
[pid:23988] done!
[pid:10360] done!
[pid:13348] done!
[pid:18028] done!
[pid:24032] done!
虽然事件的顺序实际上是随机的。由于某种原因,返回值(至少在我的 Python 版本中)实际上是一个 itertools.chain
对象。但这是一个实现细节。您可以将结果作为列表返回,例如:
>>> list(result)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
在您的示例代码中,您改为传递了一个单元素元组 (list_to_process,
),所以这只是将您的完整列表传递给一个进程。
【讨论】:
感谢您的回复!我不完全明白.. list_to_process 的迭代应该在哪里?所以我必须在 for-loop 中使用列表中的一个元素 @John nowhere,executor.map
allready 迭代列表中的每个元素并将其作为参数应用于计算函数
正如我所解释的,迭代是由ProcessPoolExecutor.map()
执行的。这基本上等价于:for item in list_to_process: calculation(item)
,只是calculation
可能在每个项目的不同进程中被调用。
玩转map
内置函数,确保您了解它的工作原理。 ProcessPoolExecutor.map
正在做同样的事情,但每次计算都被外包给不同的进程,然后以正确的顺序收集结果。
非常感谢这个!帮了我很大的忙。另外,给任何处于类似情况的人的说明。这个“map()”有多个迭代器,当最短的迭代器用完时迭代器停止。所以,如果你有一个参数对于所有循环都是不变的,你需要参考这个:***.com/a/10834984/2408212以上是关于使用 ProcessPoolExecutor 进行并行处理的主要内容,如果未能解决你的问题,请参考以下文章
在完成所有任务之前关闭 ProcessPoolExecutor 的异步