在终端和 Django 或 Flask 的代码模块中使用 python 多处理池

Posted

技术标签:

【中文标题】在终端和 Django 或 Flask 的代码模块中使用 python 多处理池【英文标题】:Using python multiprocessing Pool in the terminal and in code modules for Django or Flask 【发布时间】:2013-09-27 16:16:15 【问题描述】:

当在 python 中使用 multiprocessing.Pool 和以下代码时,会出现一些奇怪的行为。

from multiprocessing import Pool
p = Pool(3)
def f(x): return x
threads = [p.apply_async(f, [i]) for i in range(20)]
for t in threads:
    try: print(t.get(timeout=1))
    except Exception: pass

我收到以下错误 3 次(池中的每个线程一个),它打印“3”到“19”:

AttributeError: 'module' object has no attribute 'f'

前三个 apply_async 调用永远不会返回。

同时,如果我尝试:

from multiprocessing import Pool
p = Pool(3)
def f(x): print(x)
p.map(f, range(20))

我得到AttributeError 3次,shell打印“6”到“19”,然后挂起并且无法被[Ctrl] + [C]杀死

多处理文档有以下说法:

此包中的功能要求 ma​​in 模块是 孩子们可以导入。

这是什么意思?

为了澄清,我正在终端中运行代码来测试功能,但最终我希望能够将其放入 Web 服务器的模块中。如何在 python 终端和代码模块中正确使用 multiprocessing.Pool?

【问题讨论】:

也许,您需要添加 if __name__ == '__main__' 这样您的代码就不会在每次导入时运行? 【参考方案1】:

警告:在 Django 和 Flask 等 Web 服务器的上下文中使用多处理是错误的工具。相反,您应该使用Celery 之类的任务框架或Elastic Beanstalk Worker Environments 之类的基础架构解决方案。使用多处理来生成线程或进程是不好的,因为它不会让您监督或管理这些线程/进程,因此您必须构建自己的故障检测逻辑、重试逻辑等。此时,您最好使用一个现成的工具,实际上是为处理异步任务而设计的,因为它可以为您提供开箱即用的这些任务。


了解文档

this 包中的功能要求子模块可以导入主模块。

这意味着池必须在定义要在其上运行的函数之后进行初始化。如果您正在编写一个独立的脚本,那么在 if __name__ == "__main__": 块中使用池是可行的,但这在更大的代码库或服务器代码(例如 Django 或 Flask 项目)中是不可能的。因此,如果您尝试在其中之一中使用池,请确保遵循以下指南:

    尽可能初始化函数内部的池。如果您必须在全局范围内初始化它们,请在模块底部进行。 不要在全局范围内调用 Pool 的方法。

或者,如果您只需要更好的 I/O 并行性(如数据库访问或网络调用),您可以省去所有这些麻烦,并使用线程池而不是进程池。这涉及完全无证:

from multiprocessing.pool import ThreadPool

它的接口与 Pool 的接口完全相同,但是由于它使用线程而不是进程,因此它没有使用进程池所做的任何警告,唯一的缺点是您无法获得真正的代码并行性执行,只是阻塞 I/O 的并行性。


必须在定义要在其上运行的函数之后初始化池

python 文档中难以理解的文本意味着在定义池时,池中的线程会导入周围的模块。在 python 终端的情况下,这意味着你到目前为止运行的所有且唯一的代码。

因此,您要在池中使用的任何函数都必须在初始化池之前定义。模块中的代码和终端中的代码都是如此。问题中代码的以下修改将正常工作:

from multiprocessing import Pool
def f(x): return x  # FIRST
p = Pool(3) # SECOND
threads = [p.apply_async(f, [i]) for i in range(20)]
for t in threads:
    try: print(t.get(timeout=1))
    except Exception: pass

或者

from multiprocessing import Pool
def f(x): print(x)  # FIRST
p = Pool(3) # SECOND
p.map(f, range(20))

我说的很好,我的意思是在 Unix 上很好。 Windows 有它自己的问题,我不在这里讨论。


在模块中使用池

但是等等,还有更多(在您想在其他地方导入的模块中使用池)!

如果你在函数中定义一个池,你就没有问题。 但如果您在模块中使用 Pool 对象作为全局变量,则它必须定义在页面的底部,而不是顶部。尽管这与大多数优秀的代码风格背道而驰,但它对于功能来说是必要的。使用在页面顶部声明的池的方法是仅将其与从其他模块导入的函数一起使用,如下所示:

from multiprocessing import Pool
from other_module import f
p = Pool(3)
p.map(f, range(20))

从另一个模块导入一个预先配置好的池是非常可怕的,因为导入必须在你想在它上面运行的任何东西之后,就像这样:

### module.py ###
from multiprocessing import Pool
POOL = Pool(5)

### module2.py ###
def f(x):
    # Some function
from module import POOL
POOL.map(f, range(10))

其次,如果您在要导入的模块的全局范围内对池运行任何内容,系统将挂起。即这个工作:

### module.py ###
from multiprocessing import Pool
def f(x): return x
p = Pool(1)
print(p.map(f, range(5)))

### module2.py ###
import module

然而,这确实工作,只要没有导入模块2:

### module.py ###
from multiprocessing import Pool

def f(x): return x
p = Pool(1)
def run_pool(): print(p.map(f, range(5)))

### module2.py ###
import module
module.run_pool()

现在,这背后的原因更加奇怪,并且可能与问题中的代码每次只吐出一次属性错误以及之后似乎正确执行代码的原因有关。似乎池线程(至少具有一定的可靠性)在执行后重新加载模块中的代码。

【讨论】:

【参考方案2】:

在创建线程池时必须已经定义了要在线程池上执行的函数。

这应该可行:

from multiprocessing import Pool
def f(x): print(x)
if __name__ == '__main__':
    p = Pool(3)
    p.map(f, range(20))

原因是(至少在具有fork 的系统上)当您创建池时,工作人员是通过分叉当前进程来创建的。因此,如果目标函数此时尚未定义,worker 将无法调用它。

在 windows 上有点不同,因为 windows 没有fork。在这里启动新的工作进程并导入主模块。这就是为什么在 Windows 上使用if __name__ == '__main__' 保护执行代码很重要的原因。否则,每个新的工作人员都会重新执行代码,从而无限地产生新进程,从而使程序(或系统)崩溃。

【讨论】:

这个答案部分正确;在单个代码文件中运行 python 很好,但对于在更大的代码库或无法使用 if __name__ == '__main__' 的服务器代码中使用代码没有帮助。【参考方案3】:

此错误还有另一个可能的来源。运行示例代码时出现此错误。

来源是尽管正确安装了 multiprosessing,但我的系统上没有安装 C++ 编译器,pip 在尝试更新 multiprocessing 时通知了我。所以检查编译器是否已安装可能是值得的。

【讨论】:

以上是关于在终端和 Django 或 Flask 的代码模块中使用 python 多处理池的主要内容,如果未能解决你的问题,请参考以下文章

Flask

flask-简介

Flask基础简介

使用DRF和flask写注册模块差异总结

真正搞明白Python中Django和Flask框架的区别

django vs flask