进程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event)
锁 —— multiprocess.Lock
通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题。
当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。
#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
import json import time from multiprocessing import Process from multiprocessing import Lock def show(i): with open(‘ticket‘) as f: dic = json.load(f) print(‘余票: %s‘%dic[‘ticket‘]) def buy_ticket(i,lock): lock.acquire() #拿钥匙进门 with open(‘ticket‘) as f: dic = json.load(f) time.sleep(0.1) if dic[‘ticket‘] > 0 : dic[‘ticket‘] -= 1 print(‘\033[32m%s买到票了\033[0m‘%i) else: print(‘\033[31m%s没买到票\033[0m‘%i) time.sleep(0.1) with open(‘ticket‘,‘w‘) as f: json.dump(dic,f) lock.release() # 还钥匙 if __name__ == ‘__main__‘: for i in range(5): p = Process(target=show,args=(i,)) p.start() lock = Lock() for i in range(5): p = Process(target=buy_ticket, args=(i,lock)) p.start()
#虽然可以用文件共享数据实现进程间通信,但问题是: #1.效率低(共享数据基于文件,而文件是硬盘上的数据) #2.需要自己加锁处理 #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。 #队列和管道都是将数据存放于内存中 #队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来, #我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
信号量 —— multiprocess.Semaphore(了解)
信号量介绍Semaphore
互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。
假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
实现:
信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
from multiprocessing import Process,Semaphore import time,random def go_ktv(sem,user): sem.acquire() print(‘%s 占到一间ktv小屋‘ %user) time.sleep(random.randint(0,3)) #模拟每个人在ktv中待的时间不同 print(‘%s出来‘ %user) sem.release() if __name__ == ‘__main__‘: sem=Semaphore(4) p_l=[] for i in range(7): p=Process(target=go_ktv,args=(sem,‘user%s‘ %i,)) p.start() p_l.append(p) print(p_l) for i in p_l: i.join() print(‘============》‘)
事件 —— multiprocess.Event(了解)
事件介绍
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。 事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。 clear:将“Flag”设置为False set:将“Flag”设置为True
# 通过一个信号 来控制 多个进程 同时 执行或者阻塞 # 事件 # from multiprocessing import Event # 一个信号可以使所有的进程都进入阻塞状态 # 也可以控制所有的进程解除阻塞 # 一个事件被创建之后,默认是阻塞状态 # e = Event() # 创建了一个事件 # print(e.is_set()) # 查看一个事件的状态,默认被设置成阻塞 # e.set() # 将这个事件的状态改为True # print(e.is_set()) # e.wait() # 是依据e.is_set()的值来决定是否阻塞的 # print(123456) # e.clear() # 将这个事件的状态改为False # print(e.is_set()) # e.wait() # 等待 事件的信号被变成True # print(‘*‘*10) # set 和 clear # 分别用来修改一个事件的状态 True或者False # is_set 用来查看一个事件的状态 # wait 是依据事件的状态来决定自己是否在wait处阻塞 # False阻塞 True不阻塞 # 红绿灯事件 import time#引入时间模块 import random#引入随机模块 from multiprocessing import Event,Process#引入进程模块和时间模块 def cars(e,i):#定义一个函数 if not e.is_set():#如果信号灯为真的时候 print(‘car%i在等待‘%i)#打印内容 e.wait() # 阻塞 直到得到一个 事件状态变成 True 的信号 print(‘\033[0;32;40mcar%i通过\033[0m‘ % i)#打印通过 def light(e):#定义一个灯 while True:#循环为真 if e.is_set():#如果事件状态为真 e.clear()#则清除信号灯 print(‘\033[31m红灯亮了\033[0m‘)#打印红灯亮了 else:#否则 e.set()#设置状态为真 print(‘\033[32m绿灯亮了\033[0m‘)#打印绿灯亮了 time.sleep(2)#睡2秒 if __name__ == ‘__main__‘:#如果为真 e = Event()#实例化一个事件 traffic = Process(target=light,args=(e,))#定义一个灯的进程 traffic.start()#开始进程 for i in range(20):#循环20次 car = Process(target=cars, args=(e,i))#创建20个汽车进程 car.start()#启动汽车进程 time.sleep(random.random())#随机睡,随机出现0~1之间的小数
进程间通信——队列和管道(multiprocess.Queue、multiprocess.Pipe)
进程间通信
IPC(Inter-Process Communication)
队列
创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
Queue([maxsize])
创建共享的进程队列。
参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
底层队列使用管道和锁定实现。
Queue([maxsize])
创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。
Queue的实例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
q.qsize()
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
q.empty()
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
q.full()
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
q.close()
关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
q.cancel_join_thread()
不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。
q.join_thread()
连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。
代码实例
‘‘‘ multiprocessing模块支持进程间通信的两种主要形式:管道和队列 都是基于消息传递实现的,但是队列接口 ‘‘‘ from multiprocessing import Queue q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) # q.put(3) # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。 # 如果队列中的数据一直不被取走,程序就会永远停在这里。 try: q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。 except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。 print(‘队列已经满了‘) # 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。 print(q.full()) #满了 print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。 try: q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。 except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。 print(‘队列已经空了‘) print(q.empty()) #空了
上面这个例子还没有加入进程通信,只是先来看看队列为我们提供的方法,以及这些方法的使用和现象。
import time from multiprocessing import Process, Queue def f(q): q.put([time.asctime(), ‘from Eva‘, ‘hello‘]) #调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。 if __name__ == ‘__main__‘: q = Queue() #创建一个Queue对象 p = Process(target=f, args=(q,)) #创建一个进程 p.start() print(q.get()) p.join()
上面是一个queue的简单应用,使用队列q对象调用get函数来取得队列中最先进入的数据。 接下来看一个稍微复杂一些的例子:
import os import time import multiprocessing # 向queue中输入数据的函数 def inputQ(queue): info = str(os.getpid()) + ‘(put):‘ + str(time.asctime()) queue.put(info) # 向queue中输出数据的函数 def outputQ(queue): info = queue.get() print (‘%s%s\033[32m%s\033[0m‘%(str(os.getpid()), ‘(get):‘,info)) # Main if __name__ == ‘__main__‘: multiprocessing.freeze_support() record1 = [] # store input processes record2 = [] # store output processes queue = multiprocessing.Queue(3) # 输入进程 for i in range(10): process = multiprocessing.Process(target=inputQ,args=(queue,)) process.start() record1.append(process) # 输出进程 for i in range(10): process = multiprocessing.Process(target=outputQ,args=(queue,)) process.start() record2.append(process) for p in record1: p.join() for p in record2: p.join()
生产者消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
基于队列实现生产者消费者模型
# 队列 # 生产者消费者模型 # 生产者 进程 # 消费者 进程 import time#引入时间模块 import random#引入随机数 from multiprocessing import Process,Queue#引入进程模块和队列模块 def consumer(q,name):#定义一个消费者函数 while True:#循环为真 food = q.get()#拿出食物 if food is None:#如果食物为空 print(‘%s获取到了一个空‘%name)#打印胡去到一个空 break#打断 print(‘\033[31m%s消费了%s\033[0m‘ % (name,food))#打印谁消费了什么食物 time.sleep(random.randint(1,3))#随机睡1~3秒 def producer(name,food,q):#定义一个生产者函数 for i in range(4):#循环4次 time.sleep(random.randint(1,3))#随机睡1~3秒 f = ‘%s生产了%s%s‘%(name,food,i)#谁生产了什么食物 print(f)#打印内容 q.put(f)#把食物放到队列里 if __name__ == ‘__main__‘:#如果名称是当前名称 q = Queue(20)#实例化一个队列20 p1 = Process(target=producer,args=(‘Egon‘,‘包子‘,q))#创建一个进程 p2 = Process(target=producer, args=(‘wusir‘,‘泔水‘, q))#创建一个进程 c1 = Process(target=consumer, args=(q,‘alex‘))#创建一个进程 c2 = Process(target=consumer, args=(q,‘jinboss‘))#创建一个进程 p1.start()#启动一个进程 p2.start()#启动一个进程 c1.start()#启动一个进程 c2.start()#启动一个进程 p1.join()#感知p1进程结 p2.join()#感知p2进程结束 q.put(None)#往队列中添加一个None q.put(None)#往队列中添加一个None
此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。
解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环。
注意:结束信号None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送该信号
JoinableQueue([maxsize])
创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
#方法介绍 #JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法: #q.task_done() #使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。 #q.join() #生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。 #下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。
import time#引入一个时间模块 import random#引入一个随机数模块 from multiprocessing import Process,JoinableQueue#引入进程模块和队列模块 def consumer(q,name):#定义一个消费者函数 while True:#循环为真 food = q.get()#从队列中拿出食物 print(‘\033[31m%s消费了%s\033[0m‘ % (name,food))#打印内容 time.sleep(random.randint(1,3))#随机睡1~3秒 q.task_done() # count - 1# def producer(name,food,q):#生产者 for i in range(4):#循环4次 time.sleep(random.randint(1,3))#随机睡1~3秒 f = ‘%s生产了%s%s‘%(name,food,i)#谁生产了食物 print(f)#打印这个内容 q.put(f)#放入到队列里 q.join() # 阻塞 直到一个队列中的所有数据 全部被处理完毕 if __name__ == ‘__main__‘:#如果文件名为当前名称 q = JoinableQueue(20)#实例化一个队列对象 p1 = Process(target=producer,args=(‘Egon‘,‘包子‘,q))#创建一个生产者进程 p2 = Process(target=producer, args=(‘wusir‘,‘泔水‘, q))#创建一个生产着进程 c1 = Process(target=consumer, args=(q,‘alex‘))#创建一个消费者 c2 = Process(target=consumer, args=(q,‘jinboss‘))#创建一个消费者进程 p1.start()#开启一个生产者进程 p2.start()#开启一个生产者进程 c1.daemon = True # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束 c2.daemon = True #设置守护进程 c1.start() #开启一个消费者进程 c2.start() #开启一个消费者进程 p1.join() #感知一个生产者进程结束 p2.join() # 感知一个进程的结束 # 在消费者这一端: # 每次获取一个数据 # 处理一个数据 # 发送一个记号 : 标志一个数据被处理成功 # 在生产者这一端: # 每一次生产一个数据, # 且每一次生产的数据都放在队列中 # 在队列中刻上一个记号 # 当生产者全部生产完毕之后, # join信号 : 已经停止生产数据了 # 且要等待之前被刻上的记号都被消费完 # 当数据都被处理完时,join阻塞结束 # consumer 中把所有的任务消耗完 # producer 端 的 join感知到,停止阻塞 # 所有的producer进程结束 # 主进程中的p.join结束 # 主进程中代码结束 # 守护进程(消费者的进程)结束