为什么进程池封装在装饰器中不能生效,而多进程可以

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了为什么进程池封装在装饰器中不能生效,而多进程可以相关的知识,希望对你有一定的参考价值。

参考技术A



这个跟multiprocessing的原理有关,也涉及到Python中pickling的一些实现机制。

首先要说明multiprocessing模块现在有多种实现原理,类Unix系统默认使用fork,原理在于创建Process并启动的时候进行一次fork,然后子进程执行Process中指定的函数,它可以继承创建进程时内存中的对象,因此可以指定各种对象给子进程如函数、闭包、socket等

而Python2中Windows操作系统的实现spawn,每次重新启动一个全新的Python进程。以及Python3中forkserver的实现(在multiprocessing初始化的时候通过fork创建一个forkserver的进程,forkserver进程不再执行后续的代码,而是等待请求;所有后续创建multiprocessing进程时,都连接forkserver,由forkserver进行fork,产生一个“干净”的进程来作为新进程),这两种实现并不继承当前运行环境中的对象,所以需要使用的对象必须通过pickling的方法传递给子进程。

另外,multiprocessing中的Pool对象,它的原理在于预先创建好许多进程,然后从主进程中接受任务。这些任务因为不是在创建进程(fork)时创建出的,所以必须通过pickling的机制传递给子进程。multiprocessing.Queue和multiprocessing.Pipe也同样是使用pickling模块在进程间传递Python对象。

接下来就要说一下pickling了,pickling模块可以将Python对象序列化成字节流,再反序列化回到Python对象。但是它是有一定限制的,并不是所有的对象都可以进行序列化。函数和类在pickling中是不能“直接”序列化的,它们在pickling中序列化的原理在于将函数和类变为:package.module.func_name这样的字符串,即模块路径 + 函数/类名的形式。这就要求反序列化的时候:

    这个模块可以被import

    import后的这个模块中有这个全局名称

    而且全局名称代表的值就是传递的值。

    然而使用装饰器修饰的时候,main这个函数其实被修改了一次,变为了修饰之后的函数,但是未修饰的函数和修饰后的函数都共享了__main__.main这个名称,这导致传递的虽然是修饰前的函数,但实际接收方接受了__main__.main这个名称,然后重新import得到的是修饰后的函数,所以在子进程中执行的也是修饰后的函数。multiprocessing模块有个安全机制,禁止在multiprocessing创建的子进程中再创建multiprocessing子进程,因此这个修饰过的函数会在.Pool的地方报错,这个异常没有传递回主进程,所以看上去就像什么都没有做一样。作者:灵剑来源:知乎

进程池,线程池,协程

进程池、线程池、协程

池子有什么作用

在池子创建的时候就将设置的数量创建出来。之后所有的操作都由池子里的进程/线程完成。

当并发的任务数量远远大于计算机所能承受的范围,限制我进程数或线程数,从保证服务器运行。

concurrent.future模块

  • concurrent.future模块封装了线程池与进程池
  • ProcessPoolExecutor(max_works)对于进程池如果不写max_works:默认的是处理器个数
  • ThreadPoolExecutor(max_works)对于线程池如果不写max_works:默认的是处理器个数*5

常用方法

  • result :获得回调函数返回的值
  • submit :相当于apply_async方法
  • shutdown :相当于close+join方法
  • add_done_callback :为当前任务绑定一个回调函数

进程池

from  concurrent.futures import ProcessPoolExecutor
import time,random
def task(n):
    for i in range(100):
        n += i   # 计算密集型的,一般用进程,可以充分利用多核优势
    return n**2
if __name__ == '__main__':
    start = time.time()
    p = ProcessPoolExecutor()
    for i in range(10):
        obj = p.submit(task,i).result()  # 等同于apply同步方法
        print(obj)
    p.shutdown()
    print(time.time() - start)
##############################
# 结果:
0.23636794090270996

线程池

from  concurrent.futures import ProcessPoolExecutor
import time
def task(n):
    time.sleep(3)   # IO密集型的,一般用线程
    return n**2
if __name__ == '__main__':
    start = time.time()
    p = ProcessPoolExecutor()
    for i in range(10):
        obj = p.submit(task,i).result()  # 等同于apply同步方法
    p.shutdown()
    print(time.time() - start)
##############################
# 结果:
30.011648893356323

使用回调函数

进程池
from  concurrent.futures import ProcessPoolExecutor
import time,random
def parse(future)
    print(future.result())
    
def task(n):
    for i in range(100):
        n += i   # 计算密集型的,一般用进程,可以充分利用多核优势
    return n**2

if __name__ == '__main__':
    start = time.time()
    p = ProcessPoolExecutor()
    for i in range(10):
        obj = p.submit(task,i)  # 等同于apply_async同步方法
        obj.add_done_callback(parse)  # 绑定函数,当当前任务结束时调用绑定方法
    p.shutdown()
    print(time.time() - start)
##############################
# 结果:
0.2219409942626953
线程池
from  concurrent.futures import ProcessPoolExecutor
import time
def parse(future):
    print(future.result())
def task(n):
    time.sleep(3)   # IO密集型的,一般用线程
    return n**2
if __name__ == '__main__':
    start = time.time()
    p = ProcessPoolExecutor()
    for i in range(10):
        obj = p.submit(task,i)  # 等同于apply同步方法
        obj.add_done_callback(parse)
    p.shutdown()
    print(time.time() - start)
##############################
# 结果:
9.223718166351318

协程

  • 单线程下实现并发(IO密集型提高效率)

  • 并发的本质:切换+保存状态
  • 降低单个线程的IO时间

简单的协程
from gevent import monkey
monkey.patch_all() #实现捕获非gevent的io
import gevent

import time
def eat():
    print('eat 1')
    time.sleep(2)
    print('eat 2')
def play():
    print('开始运算')
    for i in range(1,10000):
        while i:
            i -= 1
    print('结束运算')

start = time.time()
# play()
# eat()
# 串行执行需要4.417968988418579
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
g1.join()
g2.join()
end = time.time()
print(end-start)  # 2.4216597080230713

以上是关于为什么进程池封装在装饰器中不能生效,而多进程可以的主要内容,如果未能解决你的问题,请参考以下文章

多线程和多进程分别是啥意思?

多进程 multiprocessing 之 Pool

JAVA 学习总结 多线程

什么?Python的多进程居然比单进程慢?

什么?Python的多进程居然比单进程慢?

多进程,多线程