Python 生产者消费者模式

Posted hades0607

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python 生产者消费者模式相关的知识,希望对你有一定的参考价值。

生产者消费者模式

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题,

该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度

 

生产者和消费者模式来源

在线程世界里, 生产者就是生产数据的线程,消费者就是消费数据的线程。

在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,

那么生产者就必须等待消费者处理完,才能继续生产数据。

同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。

为了解决这个问题于是引用了生产者和消费者模式。

 

生产者消费者模式详解

 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,

直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,

平衡了生产者和消费者的处理能力。

 

基于队列实现生产者消费者模式

q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
 
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)

q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

 

 1 from multiprocessing import Process, Queue
 2 import time, random, os
 3 
 4 
 5 def consumer(q):
 6     while True:
 7         res = q.get()
 8         if res is None: break  # 收到结束信号则结束
 9         time.sleep(random.randint(1, 3))
10         print(33[45m%s 吃 %s33[0m % (os.getpid(), res))
11 
12 
13 def producer(name, q):
14     for i in range(2):
15         time.sleep(random.randint(1, 3))
16         res = %s%s % (name, i)
17         q.put(res)
18         print(33[44m%s 生产了 %s33[0m % (os.getpid(), res))
19 
20 
21 if __name__ == __main__:
22     q = Queue()
23     # 生产者们:即厨师们
24     p1 = Process(target=producer, args=(包子, q))
25     p2 = Process(target=producer, args=(骨头, q))
26     p3 = Process(target=producer, args=(泔水, q))
27 
28     # 消费者们:即吃货们
29     c1 = Process(target=consumer, args=(q,))
30     c2 = Process(target=consumer, args=(q,))
31 
32     # 开始
33     p1.start()
34     p2.start()
35     p3.start()
36     c1.start()
37 
38     p1.join()  # 必须保证生产者全部生产完毕,才应该发送结束信号
39     p2.join()
40     p3.join()
41     q.put(None)  # 有几个消费者就应该发送几次结束信号None
42     q.put(None)  # 发送结束信号
43     print()
44 #有几个消费者就需要发送几次结束信号:相当low

 

另一种队列也提供了这种机制

#JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

   #参数介绍:
    maxsize是队列中允许最大项数,省略则无大小限制。    
  #方法介绍:
    JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

 

 1 from multiprocessing import Process,JoinableQueue
 2 import time,random,os
 3 def consumer(q):
 4     while True:
 5         res=q.get()
 6         time.sleep(random.randint(1,3))
 7         print(33[45m%s 吃 %s33[0m %(os.getpid(),res))
 8 
 9         q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了
10 
11 def producer(name,q):
12     for i in range(10):
13         time.sleep(random.randint(1,3))
14         res=%s%s %(name,i)
15         q.put(res)
16         print(33[44m%s 生产了 %s33[0m %(os.getpid(),res))
17     q.join()
18 
19 
20 if __name__ == __main__:
21     q=JoinableQueue()
22     #生产者们:即厨师们
23     p1=Process(target=producer,args=(包子,q))
24     p2=Process(target=producer,args=(骨头,q))
25     p3=Process(target=producer,args=(泔水,q))
26 
27     #消费者们:即吃货们
28     c1=Process(target=consumer,args=(q,))
29     c2=Process(target=consumer,args=(q,))
30     c1.daemon=True
31     c2.daemon=True
32 
33     #开始
34     p_l=[p1,p2,p3,c1,c2]
35     for p in p_l:
36         p.start()
37 
38     p1.join()
39     p2.join()
40     p3.join()
41     print() 
42     
43     #主进程等--->p1,p2,p3等---->c1,c2
44     #p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
45     #因而c1,c2也没有存在的价值了,应该随着主进程的结束而结束,所以设置成守护进程

 

 

生产者消费者模式总结

 程序中有两类角色:

  一类负责生产数据(生产者)

  一类负责处理数据(消费者)

引入生产者消费者模式为了解决的问题是:

  平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度

如何实现:

  生产者  --------  队列  --------  消费者

  生产者消费者模式实现程序的解耦合

 

 管道(了解)

#创建管道的类:
Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
#参数介绍:
dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。
#主要方法:
    conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
    conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
 #其他方法:
conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
conn1.fileno():返回连接使用的整数文件描述符
conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

介绍

 

 1 from multiprocessing import Process,Pipe
 2 
 3 import time,os
 4 def consumer(p,name):
 5     left,right=p
 6     left.close()
 7     while True:
 8         try:
 9             baozi=right.recv()
10             print(%s 收到包子:%s %(name,baozi))
11         except EOFError:
12             right.close()
13             break
14 def producer(seq,p):
15     left,right=p
16     right.close()
17     for i in seq:
18         left.send(i)
19         # time.sleep(1)
20     else:
21         left.close()
22 if __name__ == __main__:
23     left,right=Pipe()
24 
25     c1=Process(target=consumer,args=((left,right),c1))
26     c1.start()
27 
28 
29     seq=(i for i in range(10))
30     producer(seq,(left,right))
31 
32     right.close()
33     left.close()
34 
35     c1.join()
36     print(主进程)
37 
38 基于管道实现进程间通信(与队列的方式是类似的,队列就是管道加锁实现的)

 

注意:

生产者和消费者都没有使用管道的某个端点,就应该将其关闭,如在生产者中关闭管道的右端,在消费者中关闭管道的左端。

如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计算的,必须在所有进程中关闭管道后才能产生EOFError异常。因此在生产者中关闭管道不会有任何效果,付费消费者中也关闭了相同的管道端点。

 1 from multiprocessing import Process,Pipe
 2 
 3 import time,os
 4 def adder(p,name):
 5     server,client=p
 6     client.close()
 7     while True:
 8         try:
 9             x,y=server.recv()
10         except EOFError:
11             server.close()
12             break
13         res=x+y
14         server.send(res)
15     print(server done)
16 if __name__ == __main__:
17     server,client=Pipe()
18 
19     c1=Process(target=adder,args=((server,client),c1))
20     c1.start()
21 
22     server.close()
23 
24     client.send((10,20))
25     print(client.recv())
26     client.close()
27 
28     c1.join()
29     print(主进程)
30 #注意:send()和recv()方法使用pickle模块对对象进行序列化。
31 
32 管道可以用于双向通信,利用通常在客户端/服务器中使用的请求/响应模型或远程过程调用,就可以使用管道编写与进程交互的程序

 

以上是关于Python 生产者消费者模式的主要内容,如果未能解决你的问题,请参考以下文章

Python 生产者消费者模式

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

python 生产者消费者模式

python 生产者与消费者模式

python-生产者消费者模式

用Python多线程实现生产者消费者模式爬取斗图网的表情图片