并发编程之多进程2
Posted liuxiaolu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程之多进程2相关的知识,希望对你有一定的参考价值。
一:multiprocessing模块介绍
用来开启子进程,并在子进程中执行定制的任务(比如函数)。该模块功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue|Pipe、Lock等组件。
需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。
二:Process类的介绍
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动) 强调: 1. 需要使用关键字的方式来指定参数 2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号 参数介绍: 1 group参数未使用,值始终为None 2 3 target表示调用对象,即子进程要执行的任务 4 5 args表示调用对象的位置参数元组,args=(1,2,‘egon‘,) 6 7 kwargs表示调用对象的字典,kwargs={‘name‘:‘egon‘,‘age‘:18} 8 9 name为子进程的名称 方法介绍: 1 p.start():启动进程,并调用该子进程中的p.run() 2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 3 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁 4 p.is_alive():如果p仍然运行,返回True 5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程 属性介绍: 1 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置 2 p.name:进程的名称 3 p.pid:进程的pid 4 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) 5 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
三:僵尸进程与孤儿进程
1.僵尸进程(有害):一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。
简单来说就是指子进程执行完所有任务,已经终止了,但是还残留一些信息(进程id,进程名),但是父进程没有去处理这些残留信息,导致残留信息占用系统资源。
当出现大量的僵尸进程时,会占用系统资源,可以把它父进程杀掉,僵尸就成了孤儿,操作系统会负责回收。
import time from multiprocessing import Process def task1(): print("子进程 run") if __name__ == ‘__main__‘: for i in range(10): p = Process(target=task1) p.start() time.sleep(100000)
2.孤儿进程(无害):一个父进程结束,而它的一个或者多个子进程还在运行,那么那些进程将成为孤儿进程。孤儿进程将被init进程所收养,并且由init进程对它们完成状态收集工作。
import os import sys import time pid = os.getpid() ppid = os.getppid() print ‘im father‘, ‘pid‘, pid, ‘ppid‘, ppid pid = os.fork() #执行pid=os.fork()则会生成一个子进程 #返回值pid有两种值: # 如果返回的pid值为0,表示在子进程当中 # 如果返回的pid值>0,表示在父进程当中 if pid > 0: print ‘father died..‘ sys.exit(0) # 保证主线程退出完毕 time.sleep(1) print ‘im child‘, os.getpid(), os.getppid() 执行文件,输出结果: im father pid 32515 ppid 32015 father died.. im child 32516 1
四:守护进程
主进程创建守护进程:
其一:守护进程会在主进程代码执行完毕后就终止
其二:守护进程内无法再开启子进程,否则抛异常(AssertionError: daemonic processes are not allowed to have children)
注意:进程之间是相互独立的,主进程代码运行结束,守护进程随机终止
from multiprocessing import Process import time def task(): print(‘小主的一生‘) time.sleep(2) print(‘小主凉了‘) #守护进程运行的话,此行代码就不会运行 if __name__ == ‘__main__‘: xiaozhu=Process(target=task) xiaozhu.daemon=True #守护进程,默认为False,意味着不守护,改为True表示是守护进程 xiaozhu.start() print(‘皇帝登基‘) time.sleep(1) print(‘hello‘) print(‘皇帝薨‘)
五:进程同步
进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端是没有问题的。但是共享带来的就是竞争,竞争带来的结果就是错乱,如何控制,加锁处理。
使用锁将需要共享的数据加锁,在执行代码之前会先判断这个值。要注意:在使用锁时,必须保证锁是同一个。
互斥锁:保证了每次只有一个进程进入改都拿程序的操作,从而保证了多进程情况下数据的正确性。
from multiprocessing import Process,Lock import random import time def task1(lock): lock.acquire() print(‘hello 1‘) print(‘1 How are you!‘) time.sleep(random.randint(1,2)) print(‘bye 1‘) lock.release() def task2(lock): lock.acquire() print(‘hello 2‘) print(‘2 How are you!‘) print(‘bye 2‘) lock.release() def task3(lock): lock.acquire() print(‘hello 3‘) print(‘3 How are you!‘) print(‘bye 3‘) lock.release() if __name__ == ‘__main__‘: lock=Lock() p1=Process(target=task1,args=(lock,)) p1.start() p2=Process(target=task2,args=(lock,)) p2.start() p3=Process(target=task3,args=(lock,)) p3.start()
import json from multiprocessing import Process,Lock import time import random """ join和锁的区别 1.join中顺序是固定的 不公平 2.join是完全串行 而 锁可以使部分代码串行 其他代码还是并发 """ # 查看剩余票数 def check_ticket(usr): time.sleep(random.randint(1,3)) with open("ticket.json","r",encoding="utf-8") as f: dic = json.load(f) print("%s查看 剩余票数:%s" % (usr,dic["count"])) def buy_ticket(usr): with open("ticket.json","r",encoding="utf-8") as f: dic = json.load(f) if dic["count"] > 0: time.sleep(random.randint(1,3)) dic["count"] -= 1 with open("ticket.json", "w", encoding="utf-8") as f2: json.dump(dic,f2) print("%s 购票成功!" % usr) def task(usr,lock): check_ticket(usr) lock.acquire() buy_ticket(usr) lock.release() if __name__ == ‘__main__‘: lock = Lock() for i in range(10): p = Process(target=task,args=("用户%s" % i,lock)) p.start() #p.join() # 只有第一个整个必须完毕 别人才能买 这是不公平的
加锁可以保证多个进程修改同一个数据时,同一时间只能有一个任务可以进行修改,即串行的修改,降低了速度,但是保证了数据安全。
死锁:指的是锁无法打开导致程序卡死,首先要明确有一把锁的时候是不会卡死的,正常开发时一把锁足够使用,不要打开多把锁。
from multiprocessing import Process,Lock import time def task1(l1,l2,i): l1.acquire() print("盘子被%s抢走了" % i) time.sleep(1) l2.acquire() print("筷子被%s抢走了" % i) print("吃饭..") l1.release() l2.release() pass def task2(l1,l2,i): l2.acquire() print("筷子被%s抢走了" % i) l1.acquire() print("盘子被%s抢走了" % i) print("吃饭..") l1.release() l2.release() if __name__ == ‘__main__‘: l1 = Lock() l2 = Lock() Process(target=task1,args=(l1,l2,1)).start() Process(target=task2,args=(l1,l2,2)).start() ============================================= 运行结果卡住>>>: 盘子被1抢走了 筷子被2抢走了
六:IPC(进程间的通讯)
由于进程之间的内存都是相互独立的,所以需要对应的解决方案,能够使得进程之间可以相互传递数据。
有三种方案:
1.使用共享文件,多个进程同时读写同一个文件(I/O速度慢,传输数据大小不受限制)
2.管道是基于内存的,速度快,但是是单向的,用起来麻烦(了解 )
3.申请共享内存空间,多个进程可以共享这个内存区域(重点)
速度快,但是数据量不能太大
from multiprocessing import Manager,Process,Lock def work(d): # with lock: d[‘count‘]-=1 if __name__ == ‘__main__‘: with Manager() as m: dic=m.dict({‘count‘:100}) #创建一个共享的字典 p_l=[] for i in range(100): p=Process(target=work,args=(dic,)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)
七:队列(推荐使用)
进程之间彼此隔离,要实现进程间通信,multiprocessing模块支持两种形式:队列和管道。
队列特点:先进先出
优点:可以保证数据不会错乱,即使在多进程下,因为其put和get默认都是阻塞的。
Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。maxsize是队列中允许最大项数,省略则无大小限制。
1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。 2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常. 3 q.get_nowait():同q.get(False) 4 q.put_nowait():同q.put(False) 5 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。 6 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。 7 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
from multiprocessing import Queue # 例1: # q = Queue(1) # 创建一个队列 最多可以存一个数据 # # q.put("张三") # print(q.get()) # # q.put("李四") # put默认会阻塞 当容器中已经装满了 # # print(q.get()) # print(q.get()) # get默认会阻塞 当容器中已经没有数据了 # # print("over") # 例2: q = Queue(1) # 创建一个队列 最多可以存一个数据 # q.put("张三") # q.put("李四",False) # 第二个参数 设置为False表示不会阻塞 无论容器是满了 都会强行塞 如果满了就抛异常 print(q.get()) print(q.get(timeout=3)) # timeout 仅用于阻塞时 # q.put("李四") # put默认会阻塞 当容器中已经装满了 # # print(q.get()) # print(q.get()) # get默认会阻塞 当容器中已经没有数据了 # # print("over")
八:生产者消费者模型
1.什么是生产者消费者模型?
生产者消费者模式是通过一个容器来解决生产者与消费者的强耦合问题,生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者与消费者的处理能力。
import random from multiprocessing import Process,Queue import time # 爬数据 def get_data(q): for num in range(5): print("正在爬取第%s个数据" % num) time.sleep(random.randint(1,2)) print("第%s个数据 爬取完成" % num) # 把数据装到队列中 q.put("第%s个数据" % num) def parse_data(q): for num in range(5): # 取出数据 data = q.get() print("正在解析%s" % data) time.sleep(random.randint(1, 2)) print("%s 解析完成" % data) if __name__ == ‘__main__‘: # 共享数据容器 q = Queue(5) #生产者进程 produce = Process(target=get_data,args=(q,)) produce.start() #消费者进程 customer = Process(target=parse_data,args=(q,)) customer.start()
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print(‘