Day 32 process&threading_4
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Day 32 process&threading_4相关的知识,希望对你有一定的参考价值。
线程和进程 4
一、multiprocessing模块
multiprocessing包是Python中的多进程管理包。
与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。
1, python的Process类的进程调用
1 ############## 进程调用 2 # 调用进程模块和时间模块,并测试多进程功能 3 4 from multiprocessing import Process 5 import time 6 7 def pp(name): 8 """ 9 函数:完成进程调用测试工作 10 :return: 11 """ 12 print("you are coming!",name,time.ctime()) 13 time.sleep(2) 14 15 if __name__ == ‘__main__‘: 16 p_list = [] 17 18 # 开三个分进程,程序本身算主进程 19 for i in range(3): 20 p = Process(target=pp,args=("%s" %i,)) # 调用方式基本和thread相似 21 p_list.append(p) 22 p.start() 23 24 for j in p_list: # 分别阻断进程和主进程间关系 25 j.join() 26 27 print("end!",time.ctime())
2,继承类的调用方式
1 ######## 继承Process类调用 2 # 3 4 from multiprocessing import Process 5 import time 6 7 class MyProcess(Process): 8 """ 9 继承父类Process的所用属性和功能 10 """ 11 def __init__(self): 12 Process.__init__(self) # 继承父类的所有__init__属性 13 14 # 实例执行start时,自动触发run的执行,可查看系统源码追溯到 15 def run(self): 16 print("Welcome to beijing!",self,time.ctime()) 17 time.sleep(2) 18 19 if __name__ == ‘__main__‘: 20 p_list = [] 21 for i in range(3): 22 xx = MyProcess() 23 xx.start() 24 p_list.append(xx) 25 26 for j in p_list: 27 j.join() 28 29 print("END!",time.ctime())
二、进程间的通讯
2、1 进程队列queue
1 from multiprocessing import Process,Queue 2 3 def xx(q,n): 4 q.put(n*n+6) 5 # 测试子线程队列的位置 6 print("son process of:",id(q)) 7 8 if __name__ == ‘__main__‘: 9 q = Queue() 10 # 测试主线程队列的位置 11 print("main process of: ",id(q)) 12 13 for i in range(3): 14 p = Process(target=xx,args=(q,i,)) 15 p.start() 16 17 print(q.get()) 18 print(q.get()) 19 print(q.get()) 20 21 ‘‘‘ 22 # 事实是为了证明:Queue虽然实现了线程间的交流,但是实际是在不同线程开辟了不一样的内存空间。然而linux优化,结果就如下了:mac是一样的 23 main process of: 4319925192 24 son process of: 4319925192 25 6 26 son process of: 4319925192 27 7 28 son process of: 4319925192 29 10 30 ‘‘‘
2、2 进程管道 Pipe
1 ############## Pipe 进程管道 2 """ 3 Pipe()返回的两个连接对象代表管道的两端。 4 每个连接对象都有send()和recv()方法(等等)。 5 请注意,如果两个进程(或线程)尝试同时读取或写入管道的同一端,管道中的数据可能会损坏。 6 """ 7 8 from multiprocessing import Process,Pipe 9 10 def x_l(l_conn): 11 """ 12 开辟一个进程负责lc的通话 13 :param l_conn: 14 :return: 15 """ 16 l_conn.send("Welcome to beijing!") 17 response = l_conn.recv() 18 print("x:",response) 19 l_conn.close() 20 21 if __name__ == ‘__main__‘: 22 x_conn,l_conn = Pipe() 23 p = Process(target=x_l,args=(l_conn,)) # 开子进程,负责lc 24 p.start() 25 res = x_conn.recv() 26 print("l:",res) 27 x_conn.send("Im coming!") 28 p.join() 29 30 """ 31 l: Welcome to beijing! 32 x: Im coming! 33 """
2,3 Manager 数据共享
1 ############## Manager 数据共享:一个数据去更改另一个进程里的数据 2 3 from multiprocessing import Manager,Process 4 5 def xl(Mlist,i): 6 Mlist.append(i) 7 8 if __name__ == ‘__main__‘: 9 manager = Manager() # 实例一个Manager 10 Mlist = manager.list([1,"a"]) # 数据共享类型为列表,也可以用字典等 11 l = [] 12 13 # 开子进程,并往主线程共享列表添加变量 14 for i in range(5): 15 p = Process(target=xl,args=(Mlist,i,)) 16 p.start() 17 l.append(p) 18 19 for j in l: 20 j.join() 21 22 print(Mlist
2,4 Pool 进程池
1 ############## Pool 2 """ 3 进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程, 4 如果进程池序列中没有可供使用的进进程,那么程序就会等待, 5 直到进程池中有可用进程为止。 6 """ 7 from multiprocessing import Pool 8 import time 9 10 def xl(n): 11 print(n) 12 time.sleep(2) 13 print("END!") 14 15 if __name__ == ‘__main__‘: 16 pool_obj = Pool() # 进程池,默认进程数是cpu核数,其中os.cpu_count()查看 17 for i in range(20): 18 pool_obj.apply_async(func=xl,args=(i,)) 19 pool_obj.close() # 执行后,不会有新等进程进入进程池 20 pool_obj.join() # join()在close()之后,套路,牢记!!! 21 22 print("ALL IS OVER!")
1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async() 2 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。 3 4 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成5 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
三, 协程
1 """ 2 协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。 3 对比操作系统控制线程的切换,用户在单线程内控制协程的切换,优点如下: 4 1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级 5 2. 单线程内就可以实现并发的效果,最大限度地利用cpu 6 """
1 import time 2 3 """ 4 传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。 5 如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。 6 """ 7 # 注意到consumer函数是一个generator(生成器): 8 # 任何包含yield关键字的函数都会自动成为生成器(generator)对象 9 10 def consumer(): 11 r = ‘‘ 12 while True: 13 # 3、consumer通过yield拿到消息,处理,又通过yield把结果传回; 14 # yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。 15 # 当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时, 16 # 就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。通过这种方式,迭代器可以实现无限序列和惰性求值。 17 n = yield r 18 if not n: 19 return 20 print(‘[CONSUMER] ←← Consuming %s...‘ % n) 21 time.sleep(1) 22 r = ‘200 OK‘ 23 def produce(c): 24 # 1、首先调用c.next()启动生成器 25 next(c) 26 n = 0 27 while n < 5: 28 n = n + 1 29 print(‘[PRODUCER] →→ Producing %s...‘ % n) 30 # 2、然后,一旦生产了东西,通过c.send(n)切换到consumer执行; 31 cr = c.send(n) 32 # 4、produce拿到consumer处理的结果,继续生产下一条消息; 33 print(‘[PRODUCER] Consumer return: %s‘ % cr) 34 # 5、produce决定不生产了,通过c.close()关闭consumer,整个过程结束。 35 c.close() 36 if __name__==‘__main__‘: 37 # 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。 38 c = consumer() 39 produce(c) 40 41 42 ‘‘‘ 43 result: 44 45 [PRODUCER] →→ Producing 1... 46 [CONSUMER] ←← Consuming 1... 47 [PRODUCER] Consumer return: 200 OK 48 [PRODUCER] →→ Producing 2... 49 [CONSUMER] ←← Consuming 2... 50 [PRODUCER] Consumer return: 200 OK 51 [PRODUCER] →→ Producing 3... 52 [CONSUMER] ←← Consuming 3... 53 [PRODUCER] Consumer return: 200 OK 54 [PRODUCER] →→ Producing 4... 55 [CONSUMER] ←← Consuming 4... 56 [PRODUCER] Consumer return: 200 OK 57 [PRODUCER] →→ Producing 5... 58 [CONSUMER] ←← Consuming 5... 59 [PRODUCER] Consumer return: 200 OK 60 ‘‘‘
3,1 Gevent
1 ############# 协程 greenlet 2 3 from gevent import monkey 4 monkey.patch_all() 5 import gevent 6 from urllib import request 7 import time 8 9 def xl(url): 10 print("GET:%s" %url) 11 res = request.urlopen(url) 12 data = res.read() 13 print("%d bytes recevied from %s" %(len(data),url)) 14 15 start = time.time() 16 17 gevent.joinall([ 18 # gevent.spawn(xl,"http://www.xiaohuar.com"), 19 gevent.spawn(xl,"http://www.mmjpg.com"), 20 gevent.spawn(xl,"http://www.fengniao.com") 21 ]) 22 23 print(time.time()-start)
Greenlet
Gevent 自动切换的牛逼之处,可以的就开干
以上是关于Day 32 process&threading_4的主要内容,如果未能解决你的问题,请参考以下文章
Unix Basics & Thread v.s. Process & Parentheses 问题专题