Python 3 中的 Concurrent.futures 与多处理

Posted

技术标签:

【中文标题】Python 3 中的 Concurrent.futures 与多处理【英文标题】:Concurrent.futures vs Multiprocessing in Python 3 【发布时间】:2014-01-13 14:22:52 【问题描述】:

Python 3.2 引入了Concurrent Futures,它似乎是旧线程和multiprocessing 模块的某种高级组合。

与旧的多处理模块相比,将其用于 CPU 密集型任务有哪些优点和缺点?

This article 表明它们更容易使用 - 是这样吗?

【问题讨论】:

【参考方案1】:

我喜欢concurrent.futures,主要是因为多个函数参数的迭代器:multiprocessing 在获取函数的多个参数时有点hacky(没有istarmap()-等价于starmap()):

import multiprocessing as mp

def power_plus_one(x, y):
    return (x**y) + 1

def wrapper(t):
    return power_plus_one(*t)

with mp.Pool() as pool:
    r = list(pool.imap(wrapper, [(0, 1), (2, 2)]))

print(r)

我发现imap()/imap_unordered()tqdm 等进度条或更大计算的时间估计非常有用。在concurrents.futures,这个超级好用:

def power_plus_one(x, y):
    return (x**y) + 1

o = dict() # dict for output

with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = executor.submit(power_plus_one, x, y): (x, y) for x, y in [(0, 1), (2, 2)]
    for future in concurrent.futures.as_completed(futures):
        i = futures[future]
        o[i] = future.result()
print(o)

我也喜欢方便的结果映射作为字典。 :)

使用 tqdm,您可以轻松:

for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
    ...

【讨论】:

【参考方案2】:

concurrent.futures 给你更多的控制权,例如:

# Created by BaiJiFeiLong@gmail.com at 2021/10/19 10:37

import concurrent.futures
import multiprocessing.pool
import random
import threading
import time


def hello(name):
    time.sleep(random.random())
    return f"Hello name threading.current_thread() "


print("ThreadPool:")
pool = multiprocessing.pool.ThreadPool(4)
for args, result in pool.imap_unordered(lambda x: (x, hello(x)), range(10)):
    print(args, "=>", result)

print("\nThreadPoolExecutor:")
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
futures = executor.submit(hello, x): x for x in range(10)
for future in concurrent.futures.as_completed(futures):
    print(futures[future], "=>", future.result()

示例输出:

ThreadPool:
1 => Hello 1 <DummyProcess(Thread-2, started daemon 29700)>
0 => Hello 0 <DummyProcess(Thread-1, started daemon 29688)>
2 => Hello 2 <DummyProcess(Thread-3, started daemon 19680)>
6 => Hello 6 <DummyProcess(Thread-3, started daemon 19680)>
3 => Hello 3 <DummyProcess(Thread-4, started daemon 33028)>
4 => Hello 4 <DummyProcess(Thread-2, started daemon 29700)>
5 => Hello 5 <DummyProcess(Thread-1, started daemon 29688)>
9 => Hello 9 <DummyProcess(Thread-2, started daemon 29700)>
8 => Hello 8 <DummyProcess(Thread-4, started daemon 33028)>
7 => Hello 7 <DummyProcess(Thread-3, started daemon 19680)>

ThreadPoolExecutor:
0 => Hello 0 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
1 => Hello 1 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
2 => Hello 2 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
4 => Hello 4 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
3 => Hello 3 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
8 => Hello 8 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
5 => Hello 5 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
6 => Hello 6 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
7 => Hello 7 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
9 => Hello 9 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)> 

【讨论】:

【参考方案3】:

可能在大多数情况下,当您需要并行处理时,您会发现concurrent.futures 模块中的ProcessPoolExecutor 类或multiprocessing 模块中的Pool 类将提供相同的功能,并且它会沸腾归结为个人喜好问题。但每个都提供了一些使某些处理更方便的设施。我想我只想指出几个:

在提交一批任务时,您有时希望在任务结果可用时立即获得它们(即返回值)。这两个工具都提供了通知,即已提交任务的结果可通过回调机制获得:

使用multiprocessing.Pool

import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def process_result(return_value):
    print(return_value)

def main():
    pool = mp.Pool()
    for i in range(10):
        pool.apply_async(worker_process, args=(i,), callback=process_result)
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

也可以使用带有concurrent.futures 的回调来完成同样的操作,虽然有点尴尬:

import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def process_result(future):
    print(future.result())

def main():
    executor = concurrent.futures.ProcessPoolExecutor()
    futures = [executor.submit(worker_process, i) for i in range(10)]
    for future in futures:
        future.add_done_callback(process_result)
    executor.shutdown()

if __name__ == '__main__':
    main()

在这里,每个任务都单独提交,并为其返回一个 Future 实例。然后必须将回调添加到Future。最后,调用回调时,传递的参数是已完成任务的Future 实例,必须调用方法result 才能获得实际返回值。但是使用concurrent.futures 模块,实际上根本不需要使用回调。您可以使用as_completed 方法:

import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = [executor.submit(worker_process, i) for i in range(10)]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())

if __name__ == '__main__':
    main()

通过使用字典来保存Future 实例,很容易将返回值与传递给worker_process 的原始参数联系起来:

import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = executor.submit(worker_process, i): i for i in range(10)
        for future in concurrent.futures.as_completed(futures):
            i = futures[future] # retrieve the value that was squared
            print(i, future.result())

if __name__ == '__main__':
    main()

multiprocessing.Pool 有方法imapimap_unordered,后者允许任务结果以任意顺序返回,但不一定以完成顺序返回。这些方法被认为是maplazier 版本。使用map 方法,如果传递的iterable 参数没有__len__ 属性,它将首先转换为list,其长度将用于计算有效的chunksize如果 None 作为 chunksize 参数提供,则为值。因此,您无法通过将生成器或生成器表达式用作 iterable 来实现任何存储优化。但是对于imapimap_unordered 方法,iterable 可以是生成器或生成器表达式;它将根据需要进行迭代以产生新的提交任务。但这需要默认的 chunksize 参数为 1,因为通常无法知道 iterable 的长度。但是,如果您对 iterable(或 exact 大小如下例所示):

import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def compute_chunksize(pool_size, iterable_size):
    if iterable_size == 0:
        return 0
    chunksize, extra = divmod(iterable_size, pool_size * 4)
    if extra:
        chunksize += 1
    return chunksize

def main():
    cpu_count = mp.cpu_count()
    N = 100
    chunksize = compute_chunksize(cpu_count, N)
    with mp.Pool() as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunksize):
            print(result)

if __name__ == '__main__':
    main()

但是对于imap_unordered,除非工作进程返回原始调用参数以及返回值,否则无法轻松地将结果与提交的作业联系起来。另一方面,使用imap_unorderedimap 指定chunksize 的能力,其结果按可预测的顺序排列,应该使这些方法比调用@987654356 更有效重复@方法,本质上相当于使用1的chunksize。但是如果你确实需要按完成顺序处理结果,那么确保你应该使用带有回调函数的方法apply_async。但是,根据实验,如果您使用 chunksize 值 1 和 imap_unordered,它确实会出现,结果将按完成顺序返回。

concurrent.futures 包中的ProcessPoolExecutor 类的map 方法在某一方面与multiprocessing 包中的Pool.imap 方法相似。此方法不会将其传递的作为生成器表达式的 iterable 参数转换为列表,以便计算有效的 chunksize 值,这就是 chunksize 参数的原因默认为 1 以及为什么,如果您传递较大的 iterables,您应该考虑指定适当的 chunksize 值。但是,与Pool.imap 不同,根据我的经验看来,在所有传递给mapiterables 都被迭代之前,您无法开始迭代结果。 p>

multiprocessing.Pool 类有一个方法apply 将任务提交到池并阻塞,直到结果准备好。返回值只是传递给apply 函数的工作函数的返回值。例如:

import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def main():
    with mp.Pool() as pool:
        print(pool.apply(worker_process, args=(6,)))
        print(pool.apply(worker_process, args=(4,)))

if __name__ == '__main__':
    main()

concurrent.futures.ProcessPoolExecutor 类没有这样的等价物。您必须针对返回的Future 实例发出submit,然后调用result。不得不这样做并不困难,但是Pool.apply 方法对于适合阻塞任务提交的用例来说更方便。这种情况是当您有处理需要线程时,因为在线程中完成的大部分工作都是大量的 I/O,除了一个可能非常受 CPU 限制的函数。创建线程的主程序首先创建一个multiprocessing.Pool 实例并将其作为参数传递给所有线程。当线程需要调用 CPU 密集型函数时,它现在使用 Pool.apply 方法运行该函数,从而在另一个进程中运行代码并释放当前进程以允许其他线程运行。

concurrent.futures 模块有两个类,ProcessPoolExecutorThreadPoolExecutor,具有相同的接口。这是一个不错的功能。但是multiprocessing 模块也有一个未记录的ThreadPool 类,其接口与Pool 相同:

>>> from multiprocessing.pool import Pool
>>> from multiprocessing.pool import ThreadPool
>>> dir(Pool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>> dir(ThreadPool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>>

您可以使用ProcessPoolExecutor.submit(返回Future 实例)或Pool.apply_async(返回AsyncResult 实例)提交任务,并指定检索结果的超时值:

from concurrent.futures import ProcessPoolExecutor, TimeoutError
from time import sleep


def worker_1():
    while True:
        print('hanging')
        sleep(1)


def main():
    with ProcessPoolExecutor(1) as pool:
        future = pool.submit(worker_1)
        try:
            future.result(3) # kill task after 3 seconds?
        except TimeoutError:
            print('timeout')

if __name__ == '__main__':
    main()
    print("return from main()")

打印:

hanging
hanging
hanging
timeout
hanging
hanging
hanging
hanging
hanging
hanging
hanging
etc.

调用future.result(3)的主进程会在3秒后得到TimeoutError异常,因为提交的任务在这段时间内没有完成。但任务仍在继续运行,占用了进程,with ProcessPoolExecutor(1) as pool: 块永远不会退出,因此程序不会终止。

from multiprocessing import Pool, TimeoutError
from time import sleep


def worker_1():
    while True:
        print('hanging')
        sleep(1)

def main():
    with Pool(1) as pool:
        result = pool.apply_async(worker_1, args=())
        try:
            result.get(3) # kill task after 3 seconds?
        except TimeoutError:
            print('timeout')


if __name__ == '__main__':
    main()
    print("return from main()")

打印:

hanging
hanging
hanging
timeout
return from main()

然而,这一次,即使超时任务仍在继续运行并占用了进程,with 块并没有被阻止退出,因此程序正常终止。原因是Pool 实例的上下文管理器将在块退出时执行对terminate 的调用,这会导致池中的所有进程立即终止。这与ProcessPoolExecutor 实例的上下文处理程序形成对比,后者执行对shutdown(wait=True) 的调用以等待池中所有进程在其管理的块退出时终止。如果您正在使用上下文处理程序来处理池终止并且存在超时的可能性,那么优势似乎要归功于 multiprocessing.Pool

但由于multiprocessing.Pool 的上下文处理程序仅调用terminate 而不是close 后跟join,因此您必须确保在退出with 块之前已提交的所有作业都已完成,因为例如,通过阻塞同步调用(例如map)提交作业,或在AsyncResult 对象上调用get,该对象由apply_async 调用返回,或将调用结果迭代到imap,或通过调用@987654411 @ 后跟 join 在池实例上。

虽然使用ProcessPoolExecutor 时在超时任务完成之前无法退出,但您可以取消启动尚未运行的已提交任务。在下面的演示中,我们有一个大小为 1 的池,因此作业只能连续运行。我们一个接一个地提交 3 个作业,其中前两个作业需要 3 秒才能运行,因为调用了 time.sleep(3)。我们立即尝试取消前两个工作。第一次取消尝试失败,因为第一个作业已经在运行。但是因为池中只有一个进程,所以第二个作业必须等待 3 秒,第一个作业完成后才能开始运行,因此取消成功。最后,作业 3 将在作业 1 完成后几乎立即开始和结束,这大约是我们开始作业提交后的 3 秒:

from concurrent.futures import ProcessPoolExecutor
import time

def worker1(i):
    time.sleep(3)
    print('Done', i)

def worker2():
    print('Hello')

def main():
    with ProcessPoolExecutor(max_workers=1) as executor:
        t = time.time()
        future1 = executor.submit(worker1, 1)
        future2 = executor.submit(worker1, 2)
        future3 = executor.submit(worker2)
        # this will fail since this task is already running:
        print(future1.cancel())
        # this will succeed since this task hasn't started (it's waiting for future1 to complete):
        print(future2.cancel())
        future3.result() # wait for completion
        print(time.time() - t)

if __name__ == '__main__':
    main()

打印:

False
True
Done 1
Hello
3.1249606609344482

【讨论】:

这是一个绝妙的答案。【参考方案4】:

根据我的经验,与 concurrent.futures 相比,我在多处理模块方面遇到了很多问题。(但这是在 Windows 操作系统上)

我可以看到的两个主要区别是:

    多处理模块经常挂起 Concurrent.futures 有一种相对简单的执行方式。这意味着获取结果、跟踪子进程等非常简单。

示例:(获取结果)

with concurrent.futures.ProcessPoolExecutor() as executor:
    f1 = executor.submit(some_function, parameter_to_be_passed) 
    print(f1.result())

因此,如果您从some_function() 返回任何值,您可以使用f1.result() 直接捕获/存储它。在“多处理”模块中,同样的事情需要额外的步骤。

如果您在 Linux 系统上运行,则可能不会发生挂起,但“多处理”模块中的执行复杂性仍然更高。

话虽如此,同样重要的是要注意我的任务是高度 CPU 密集型任务。

就个人而言,我会推荐 concurrent.futures。

【讨论】:

经常挂起?这是一个相当不具体的说法。会不会是你的代码? multiprocessing.pool 也不需要“额外”步骤:async_result = pool.submit(some_function, args=(parameter1, parameter2, ...)); print(async_result.get())【参考方案5】:

除了其他答案的详细差异列表之外,当一名工人以某种方式崩溃时,我个人遇到了一个未修复的(截至 2020 年 10 月 27 日)indefinite hang that can happen with multiprocess.Pool。 (在我的例子中,一个 cython 扩展的例外,尽管其他人说这可能发生在工作人员获得 SIGTERM 等时。)根据the documentation for ProcessPoolExecutor,自 python 3.3 以来它一直很健壮。

【讨论】:

【参考方案6】:

我不会称concurrent.futures 更“高级”——它是一个更简单的接口,无论您使用多线程还是多进程作为底层并行化噱头,它的工作原理都非常相似。

因此,与几乎所有“更简单的界面”实例一样,涉及到的权衡取舍大致相同:它的学习曲线较浅,很大程度上是因为可用的太少了学到了;但是,由于它提供的选项较少,最终可能会以更丰富的界面不会让您感到沮丧的方式。

就 CPU 密集型任务而言,它的定义太低了,说不出意义。对于 CPython 下的 CPU 密集型任务,您需要多个进程而不是多个线程才能有机会获得加速。但是你获得多少(如果有的话)加速取决于你的硬件、操作系统的细节,尤其是你的特定任务需要多少进程间通信。在幕后,所有的进程间并行化噱头都依赖于相同的操作系统原语——您用来获得这些原语的高级 API 并不是影响底线速度的主要因素。

编辑:示例

这是您引用的文章中显示的最终代码,但我添加了使其工作所需的导入语句:

from concurrent.futures import ProcessPoolExecutor
def pool_factorizer_map(nums, nprocs):
    # Let the executor divide the work among processes by using 'map'.
    with ProcessPoolExecutor(max_workers=nprocs) as executor:
        return num:factors for num, factors in
                                zip(nums,
                                    executor.map(factorize_naive, nums))

这里使用multiprocessing 是完全相同的:

import multiprocessing as mp
def mp_factorizer_map(nums, nprocs):
    with mp.Pool(nprocs) as pool:
        return num:factors for num, factors in
                                zip(nums,
                                    pool.map(factorize_naive, nums))

请注意,在 Python 3.3 中添加了使用 multiprocessing.Pool 对象作为上下文管理器的功能。

至于哪个更容易使用,它们本质上是相同的。

一个不同之处在于,Pool 支持多种不同的做事方式,您可能直到在学习曲线上爬了一段距离后才意识到它是多么容易可以。 p>

同样,所有这些不同的方式既是优势也是劣势。它们是一种优势,因为在某些情况下可能需要灵活性。它们是一个弱点,因为“最好只有一种明显的方法”。从长远来看,一个专门(如果可能)坚持使用 concurrent.futures 的项目可能会更容易维护,因为它的最小 API 的使用方式缺乏无缘无故的新颖性。

【讨论】:

“你需要多个进程而不是多个线程才有机会获得加速” 太苛刻了。如果速度很重要;该代码可能已经使用了 C 库,因此它可以发布 GIL,例如 regex、lxml、numpy。 @JFSebastian,感谢您的补充——也许我应该说“在 pure CPython 下”,但恐怕没有捷径可以解释这里的真相讨论 GIL。 值得一提的是,线程在长 IO 操作时可能特别有用且足够。 @TimPeters 在某些方面ProcessPoolExecutor 实际上比Pool 有更多的选项,因为ProcessPoolExecutor.submit 返回允许取消的Future 实例 (cancel),检查 which 引发异常 (exception),并动态添加要在完成时调用的回调 (add_done_callback)。这些功能都不适用于Pool.apply_async 返回的AsyncResult 实例。在其他方面,Pool 有更多选项,因为 initializer / initargsmaxtasksperchildcontextPool.__init__ 中,以及更多由 Pool 实例公开的方法。 @max,当然,但请注意,问题不是关于Pool,而是关于模块。 Poolmultiprocessing 中的一小部分,并且在文档中的位置太低了,人们需要一段时间才能意识到它甚至存在于 multiprocessing 中。这个特定的答案集中在Pool,因为这就是OP链接到的所有文章,而cf“更容易使用”对于文章所讨论的内容来说根本不是真的。除此之外,cfas_completed() 也可以很方便。

以上是关于Python 3 中的 Concurrent.futures 与多处理的主要内容,如果未能解决你的问题,请参考以下文章

python中的3目运算(3元表达式)

选择文本时 Python 3.3.0 中的 IDLE 崩溃

使用 python 3.8+(默认协议=5)时,pickle.load 在 python 3.7 中的(协议=4)对象上失败

Python2/3中的urllib库

python中的动态模块导入(代码从3.2到3.3)

text Sublime Text 3中的Python 3