为什么进程池封装在装饰器中不能生效,而多进程可以
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了为什么进程池封装在装饰器中不能生效,而多进程可以相关的知识,希望对你有一定的参考价值。
参考技术A这个跟multiprocessing的原理有关,也涉及到Python中pickling的一些实现机制。
首先要说明multiprocessing模块现在有多种实现原理,类Unix系统默认使用fork,原理在于创建Process并启动的时候进行一次fork,然后子进程执行Process中指定的函数,它可以继承创建进程时内存中的对象,因此可以指定各种对象给子进程如函数、闭包、socket等
而Python2中Windows操作系统的实现spawn,每次重新启动一个全新的Python进程。以及Python3中forkserver的实现(在multiprocessing初始化的时候通过fork创建一个forkserver的进程,forkserver进程不再执行后续的代码,而是等待请求;所有后续创建multiprocessing进程时,都连接forkserver,由forkserver进行fork,产生一个“干净”的进程来作为新进程),这两种实现并不继承当前运行环境中的对象,所以需要使用的对象必须通过pickling的方法传递给子进程。
另外,multiprocessing中的Pool对象,它的原理在于预先创建好许多进程,然后从主进程中接受任务。这些任务因为不是在创建进程(fork)时创建出的,所以必须通过pickling的机制传递给子进程。multiprocessing.Queue和multiprocessing.Pipe也同样是使用pickling模块在进程间传递Python对象。
接下来就要说一下pickling了,pickling模块可以将Python对象序列化成字节流,再反序列化回到Python对象。但是它是有一定限制的,并不是所有的对象都可以进行序列化。函数和类在pickling中是不能“直接”序列化的,它们在pickling中序列化的原理在于将函数和类变为:package.module.func_name这样的字符串,即模块路径 + 函数/类名的形式。这就要求反序列化的时候:
这个模块可以被import
import后的这个模块中有这个全局名称
而且全局名称代表的值就是传递的值。
然而使用装饰器修饰的时候,main这个函数其实被修改了一次,变为了修饰之后的函数,但是未修饰的函数和修饰后的函数都共享了__main__.main这个名称,这导致传递的虽然是修饰前的函数,但实际接收方接受了__main__.main这个名称,然后重新import得到的是修饰后的函数,所以在子进程中执行的也是修饰后的函数。multiprocessing模块有个安全机制,禁止在multiprocessing创建的子进程中再创建multiprocessing子进程,因此这个修饰过的函数会在.Pool的地方报错,这个异常没有传递回主进程,所以看上去就像什么都没有做一样。作者:灵剑来源:知乎
进程池,线程池,协程
进程池、线程池、协程
池子有什么作用
在池子创建的时候就将设置的数量创建出来。之后所有的操作都由池子里的进程/线程完成。
当并发的任务数量远远大于计算机所能承受的范围,限制我进程数或线程数,从保证服务器运行。
concurrent.future模块
- concurrent.future模块封装了线程池与进程池
- ProcessPoolExecutor(max_works)对于进程池如果不写max_works:默认的是处理器个数
- ThreadPoolExecutor(max_works)对于线程池如果不写max_works:默认的是处理器个数*5
常用方法
- result :获得回调函数返回的值
- submit :相当于apply_async方法
- shutdown :相当于close+join方法
- add_done_callback :为当前任务绑定一个回调函数
进程池
from concurrent.futures import ProcessPoolExecutor
import time,random
def task(n):
for i in range(100):
n += i # 计算密集型的,一般用进程,可以充分利用多核优势
return n**2
if __name__ == '__main__':
start = time.time()
p = ProcessPoolExecutor()
for i in range(10):
obj = p.submit(task,i).result() # 等同于apply同步方法
print(obj)
p.shutdown()
print(time.time() - start)
##############################
# 结果:
0.23636794090270996
线程池
from concurrent.futures import ProcessPoolExecutor
import time
def task(n):
time.sleep(3) # IO密集型的,一般用线程
return n**2
if __name__ == '__main__':
start = time.time()
p = ProcessPoolExecutor()
for i in range(10):
obj = p.submit(task,i).result() # 等同于apply同步方法
p.shutdown()
print(time.time() - start)
##############################
# 结果:
30.011648893356323
使用回调函数
进程池
from concurrent.futures import ProcessPoolExecutor
import time,random
def parse(future)
print(future.result())
def task(n):
for i in range(100):
n += i # 计算密集型的,一般用进程,可以充分利用多核优势
return n**2
if __name__ == '__main__':
start = time.time()
p = ProcessPoolExecutor()
for i in range(10):
obj = p.submit(task,i) # 等同于apply_async同步方法
obj.add_done_callback(parse) # 绑定函数,当当前任务结束时调用绑定方法
p.shutdown()
print(time.time() - start)
##############################
# 结果:
0.2219409942626953
线程池
from concurrent.futures import ProcessPoolExecutor
import time
def parse(future):
print(future.result())
def task(n):
time.sleep(3) # IO密集型的,一般用线程
return n**2
if __name__ == '__main__':
start = time.time()
p = ProcessPoolExecutor()
for i in range(10):
obj = p.submit(task,i) # 等同于apply同步方法
obj.add_done_callback(parse)
p.shutdown()
print(time.time() - start)
##############################
# 结果:
9.223718166351318
协程
单线程下实现并发(IO密集型提高效率)
- 并发的本质:切换+保存状态
降低单个线程的IO时间
简单的协程
from gevent import monkey
monkey.patch_all() #实现捕获非gevent的io
import gevent
import time
def eat():
print('eat 1')
time.sleep(2)
print('eat 2')
def play():
print('开始运算')
for i in range(1,10000):
while i:
i -= 1
print('结束运算')
start = time.time()
# play()
# eat()
# 串行执行需要4.417968988418579
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
g1.join()
g2.join()
end = time.time()
print(end-start) # 2.4216597080230713
以上是关于为什么进程池封装在装饰器中不能生效,而多进程可以的主要内容,如果未能解决你的问题,请参考以下文章