有没有办法在嵌套函数或模块中使用 multiprocessing.pool ?

Posted

技术标签:

【中文标题】有没有办法在嵌套函数或模块中使用 multiprocessing.pool ?【英文标题】:Is there any way to use multiprocessing.pool within a nested function or module? 【发布时间】:2018-08-08 23:39:30 【问题描述】:

感谢您查看此内容。我承认我已经在 python 中涉足了 1 周的并行处理,所以如果我错过了一个明显的解决方案,我深表歉意。我有一段代码,我想运行几个不同的 mp.pool() 实例。主 .py 文件中的那些工作正常,但是当我尝试将它们添加到模块中的函数时,我没有从它们中得到任何输出。该应用程序只是运行过去并继续。我认为这可能与post 有关,但它没有提供任何关于实现我需要的替代方法的想法。在一个简单的例子中工作的代码是这样的:

import multiprocessing as mp
def multiproc_log_result(retval):
    results.append(retval)
    if len(results) % (10 // 10) == 0:
        print('0% done'.format(100 * len(results) / 10))

def meat():
    print 'beef'
    status = True
    return status
results = []
pool = mp.Pool(thread_count)
for x in range(10):
    pool.apply_async(meat, callback=multiproc_log_result)
pool.close()
pool.join()


def veggie():
    print 'carrot'
    status = True
    return status

results = []
pool = mp.Pool(thread_count)
for x in range(10):
    pool.apply_async(veggie, callback=multiproc_log_result)
pool.close()
pool.join()

而不起作用的代码是:

import multiprocessing as mp
def multiproc_log_result(retval):
    results.append(retval)
    if len(results) % (10 // 10) == 0:
        print('0% done'.format(100 * len(results) / 10))

def meat():
    print 'beef'
    status = True
    return status
results = []
pool = mp.Pool(thread_count)
for x in range(10):
    pool.apply_async(meat, callback=multiproc_log_result)
pool.close()
pool.join()

def nested_stupid_fn():
    def multiproc_log_result(retval):
        results.append(retval)
        if len(results) % (10 // 10) == 0:
            print('0% done'.format(100 * len(results) / 10))

    def veggie():
        print 'carrot'
        status = True
        return status

    results = []
    pool = mp.Pool(thread_count)
    for x in range(10):
        pool.apply_async(veggie, callback=multiproc_log_result)
    pool.close()
    pool.join()
nested_stupid_fn()

最终,我希望通过将它存在于单独模块中的另一个函数中来删除一个不起作用的示例。因此,当我导入模块 packngo 并将其用作 packngo.basic_packngo(inputs) 并在其中某处具有嵌套函数的内容时,它们将运行。任何帮助将不胜感激。 :D 我是一个非常简单的人,所以如果你能像对孩子一样解释,也许它会在我脑海中浮现!

【问题讨论】:

我要指出的是,有些代码像 multiproc_log_result 一样被删掉了,目前实际上并没有做任何事情。这只是一种快速/肮脏的简化和测试方法,以隔离我的问题发生的位置。 【参考方案1】:

您链接的另一个问题有解决方案,只是没有说明:您不能使用嵌套函数作为apply*/*map* 上@ 方法系列的func 参数987654324@。它们适用于multiprocessing.dummy.Pool,因为multiprocessing.dummy 由可以直接传递函数引用的线程支持,但是multiprocessing.Pool 必须对函数进行腌制,并且只有具有可导入名称的函数才能被腌制。如果您检查嵌套函数的名称,它类似于modulename.outerfuncname.<locals>.innerfuncname,并且<locals> 组件使其无法导入(这通常是一件好事;利用嵌套的嵌套函数通常在关闭时具有临界状态范围,仅导入就会丢失)。

callback 函数以嵌套方式定义是非常好的,因为它们是在父进程中执行的,它们不会发送给工作人员。在您的情况下,只有回调依赖于闭包范围,因此将 func (veggie) 移出全局范围是非常好的,将您的 packngo 模块定义为:

def veggie():
    print 'carrot'
    status = True
    return status

def nested_stupid_fn():
    def multiproc_log_result(retval):
        results.append(retval)
        if len(results) % (10 // 10) == 0:
            print('0% done'.format(100 * len(results) / 10))

    results = []
    pool = mp.Pool(thread_count)
    for x in range(10):
        pool.apply_async(veggie, callback=multiproc_log_result)
    pool.close()
    pool.join()
nested_stupid_fn()

是的,这意味着veggie 成为相关模块的公共成员。如果你想表明它应该被视为一个实现细节,你可以在它前面加上一个下划线 (_veggie),但它必须是全局的才能与 multiprocessing.Pool 一起使用。

【讨论】:

【参考方案2】:

我认为问题在于multiproc_log_result 的范围内不存在变量results。 因此,您应该做的是直接将异步调用的结果附加到结果中。 您将无法跟踪进度(我猜无法直接为类之外的回调函数共享全局变量)

from multiprocessing.pool import ThreadPool

def nested_stupid_fn():
    def multiproc_log_result(retval):
        results.append(retval)

    def veggie():
        print 'carrot'
        status = True
        return status

    results = []
    pool = ThreadPool(thread_count)
    for x in range(10):
        results.append(pool.apply_async(veggie))

    pool.close()
    pool.join()

    results = [result.get() for result in results]  # get value from async result

    ...then do stuff with results

【讨论】:

感谢您的想法,但我认为这不会导致问题。我可以完全删除状态跟踪的回调,但它实际上仍然没有做任何事情(在这种情况下打印 'carrot' 十次,但在实际示例中使用 shutil 移动文件)。 你试过了吗?我还通过 ThreadPool 更改了 Pool,它确实在我的机器上工作 无论如何,它不会打印你的胡萝卜,因为你的分离线程将无法在控制台中打印 @GabrielSamain:ThreadPool 使它起作用只是因为将函数发送给线程工作者不需要酸洗,而将其发送给进程工作者则需要。嵌套函数没有可导入的名称,只有具有全局可导入名称的函数才能被pickle。

以上是关于有没有办法在嵌套函数或模块中使用 multiprocessing.pool ?的主要内容,如果未能解决你的问题,请参考以下文章

有没有办法将状态信息从反应钩子传递到反应中的嵌套函数?

从函数中退出以减少激活对象比递归或调用嵌套函数更好吗?

有没有办法从linux内核模块调用用户空间函数?

声明一个对象类型的表,其中对象包含两个嵌套表

有没有办法使用 Template Haskell 枚举模块中的所有函数?

有没有办法在它的孩子中修改嵌套类的实现?