Pool.apply_async():嵌套函数未执行

Posted

技术标签:

【中文标题】Pool.apply_async():嵌套函数未执行【英文标题】:Pool.apply_async(): nested function is not executed 【发布时间】:2019-10-25 07:02:06 【问题描述】:

我正在熟悉 Python 的 multiprocessing 模块。以下代码按预期工作:

#outputs 0 1 2 3
from multiprocessing import Pool
def run_one(x):
    print x
    return

pool = Pool(processes=12)
for i in range(4):
    pool.apply_async(run_one, (i,))
pool.close()
pool.join() 

但是,现在,如果我在上面的代码周围包装一个函数,print 语句不会被执行(或者至少重定向输出):

#outputs nothing
def run():
    def run_one(x):
        print x
        return    

    pool = Pool(processes=12)
    for i in range(4):    
        pool.apply_async(run_one, (i,))
    pool.close()
    pool.join()

如果我将run_one 定义移到run 之外,当我调用run() 时,输出又是预期的:

#outputs 0 1 2 3
def run_one(x):
    print x
    return

def run():    
    pool = Pool(processes=12)
    for i in range(4):       
        pool.apply_async(run_one, (i,))
    pool.close()
    pool.join() 

我在这里缺少什么?为什么第二个 sn-p 不打印任何东西?如果我只是调用run_one(i) 函数而不是使用apply_async,则所有三个代码的输出都相同。

【问题讨论】:

【参考方案1】:

Pool 需要腌制(序列化)它发送到其工作进程的所有内容。 Pickling 实际上只保存函数的名称,而 unpickling 需要按名称重新导入函数。 为此,需要在顶层定义函数,嵌套函数不能被子函数导入,并且已经尝试腌制它们会引发异常:

from multiprocessing.connection import _ForkingPickler

def run():
    def foo(x):
        pass
    _ForkingPickler.dumps(foo)  # multiprocessing custom pickler;
                                # same effect with pickle.dumps(foo)

run()
# Out:
Traceback (most recent call last):
...
AttributeError: Can't pickle local object 'run.<locals>.foo'

您看不到异常的原因是,Pool 已经开始在父项中的酸洗任务期间捕获异常,并且仅当您在您立即得到的 AsyncResult 对象上调用 .get() 时才重新引发异常当你打电话给pool.apply_async()

这就是为什么(对于 Python 2)你最好总是这样使用它,即使你的目标函数没有返回任何东西(仍然返回隐式 None):

    results = [pool.apply_async(foo, (i,)) for i in range(4)]
    # `pool.apply_async()` immediately returns AsyncResult (ApplyResult) object
    for res in results:
        res.get()

Pool.map()Pool.starmap() 这样的非异步池方法在底层使用相同的(异步)低级函数,就像它们的异步兄弟一样,但它们额外为你调用 .get(),所以你总是会看到这些方法例外。

Python 3 有一个用于异步池方法的 error_callback 参数,您可以使用它来处理异常。

【讨论】:

以上是关于Pool.apply_async():嵌套函数未执行的主要内容,如果未能解决你的问题,请参考以下文章

Python 3.6.8 - multiprocessing.Pool.apply_async() 不工作

如何从 pool.apply_async 调用中累积结果?

将管道/连接作为上下文参数传递给多处理 Pool.apply_async()

pool.apply_async 和全局变量

无法使用 python 的多处理 Pool.apply_async() 腌制 <type 'instancemethod'>

python multiprocessing.pool.apply_async 占用内存多 解决方法