生产者+消费者模型控制内存

Posted shawnhuang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了生产者+消费者模型控制内存相关的知识,希望对你有一定的参考价值。

生产者+消费者模型
从网上爬取数据
从网页上获取数据的过程 --》 生产数据的过程,爬取网页,是生产者行为
把数据取回来进行分析得出结果 --》数据消费过程,是消费者行为

使用队列来完成生产、消费的过程
生产者,是进程
消费者,是进程
生产者与消费者之间,传递数据,需要一个盘子(IPC)

# 没设置时间延迟的情况下

from multiprocessing import Queue, Process

def consumer(name, q):
    while 1:
        food = q.get()  # 会阻塞,直到队列里有数据就取出来
        print("%s把包子%s号给吃了" % (name, food))

def producer(q, food_name):
    for i in range(10):
        food = "%s%s" % (food_name, i)
        print("我做了%s号" % food)
        q.put(food)

if __name__ == "__main__":
    q = Queue()
    p = Process(target=consumer, args=("alex", q))
    p1 = Process(target=producer, args=(q, "包子"))
    p.start()
    p1.start()

# 我做了包子0号
# 我做了包子1号
# 我做了包子2号
# 我做了包子3号
# 我做了包子4号
# alex把包子包子0号给吃了
# 我做了包子5号
# 我做了包子6号
# 我做了包子7号
# 我做了包子8号
# 我做了包子9号
# alex把包子包子1号给吃了
# alex把包子包子2号给吃了
# alex把包子包子3号给吃了
# alex把包子包子4号给吃了
# alex把包子包子5号给吃了
# alex把包子包子6号给吃了
# alex把包子包子7号给吃了
# alex把包子包子8号给吃了
# alex把包子包子9号给吃了

# 运行发现明显生产比消费更快,供大于求,注意程序并没有停止运行

 

# 设置时间延迟后

from multiprocessing import Queue, Process
import time

def consumer(name, q):
    while 1:
        food = q.get()  # 会阻塞,直到队列里有数据就取出来
        time.sleep(1)
        print("%s把包子%s号给吃了" % (name, food))

def producer(q, food_name):
    for i in range(10):
        time.sleep(0.5)
        food = "%s%s" % (food_name, i)
        print("我做了%s号" % food)
        q.put(food)

if __name__ == "__main__":
    q = Queue()
    p = Process(target=consumer, args=("alex", q))
    p1 = Process(target=producer, args=(q, "包子"))
    p.start()
    p1.start()

# 我做了包子0号
# 我做了包子1号
# 我做了包子2号
# alex把包子包子0号给吃了
# 我做了包子3号
# alex把包子包子1号给吃了
# 我做了包子4号
# 我做了包子5号
# alex把包子包子2号给吃了
# 我做了包子6号
# 我做了包子7号
# alex把包子包子3号给吃了
# 我做了包子8号
# 我做了包子9号
# alex把包子包子4号给吃了
# alex把包子包子5号给吃了
# alex把包子包子6号给吃了
# alex把包子包子7号给吃了
# alex把包子包子8号给吃了
# alex把包子包子9号给吃了

# 运行发现,虽然设置了延迟,但是生产(producer)的延迟比消费(consumer)的延迟更短
# 所以还是供大于求

 

# 修改生产者与消费者的延迟设置

from multiprocessing import Queue, Process
import time

def consumer(name, q):
    while 1:
        food = q.get()  # 会阻塞,直到队列里有数据就取出来
        time.sleep(0.5)
        print("%s把包子%s号给吃了" % (name, food))

def producer(q, food_name):
    for i in range(10):
        time.sleep(1)
        food = "%s%s" % (food_name, i)
        print("我做了%s号" % food)
        q.put(food)

if __name__ == "__main__":
    q = Queue()
    p = Process(target=consumer, args=("alex", q))
    p1 = Process(target=producer, args=(q, "包子"))
    p.start()
    p1.start()

# 我做了包子0号
# alex把包子包子0号给吃了
# 我做了包子1号
# alex把包子包子1号给吃了
# 我做了包子2号
# alex把包子包子2号给吃了
# 我做了包子3号
# alex把包子包子3号给吃了
# 我做了包子4号
# alex把包子包子4号给吃了
# 我做了包子5号
# alex把包子包子5号给吃了
# 我做了包子6号
# alex把包子包子6号给吃了
# 我做了包子7号
# alex把包子包子7号给吃了
# 我做了包子8号
# alex把包子包子8号给吃了
# 我做了包子9号
# alex把包子包子9号给吃了

# 很明显,在只有一个消费者和一个生产者的前提下,如果生产的时间要比消费的时间更长
# 那么每生产一个包子,消费者都能立刻吃掉,并且等着下一个生产好的包子

 

# 基于队列实现生产者消费者模型

from multiprocessing import Queue, Process
import time

def consumer(name, q):
    while 1:
        food = q.get()  # 会阻塞,直到队列里有数据就取出来
        time.sleep(2)
        print("%s拿到了包子%s号后并把它吃掉" % (name, food))

def producer(q, food_name):
    for i in range(20):  # 生产20个包子
        time.sleep(0.1)
        food = "%s%s" % (food_name, i)
        print("我做了%s号" % food)
        q.put(food)  # 如果队列满了就在这里阻塞,直到队列里有空位置

if __name__ == "__main__":
    q = Queue(5)  # 这个队列的容量,这里是指 6个的容量
    p = Process(target=consumer, args=("alex", q))  # 消费者1号
    p1 = Process(target=consumer, args=("太白", q))  # 消费者2号
    p2 = Process(target=producer, args=(q, "包子"))  #  生产者1号
    p.start()
    p1.start()
    p2.start()

# 我做了包子0号
# 我做了包子1号
# 我做了包子2号
# 我做了包子3号
# 我做了包子4号
# 我做了包子5号
# 我做了包子6号
# 我做了包子7号
# alex拿到包子包子0号并把它吃掉
# 太白拿到包子包子1号并把它吃掉
# 我做了包子8号
# 我做了包子9号
# alex拿到包子包子2号并把它吃掉
# 我做了包子10号
# 太白拿到包子包子3号并把它吃掉
# 我做了包子11号
# alex拿到包子包子4号并把它吃掉
# 太白拿到包子包子5号并把它吃掉
# 我做了包子12号
# 我做了包子13号
# alex拿到包子包子6号并把它吃掉
# 我做了包子14号
# 太白拿到包子包子7号并把它吃掉
# 我做了包子15号
# alex拿到包子包子8号并把它吃掉
# 我做了包子16号
# 太白拿到包子包子9号并把它吃掉
# 我做了包子17号
# alex拿到包子包子10号并把它吃掉
# 我做了包子18号
# 太白拿到包子包子11号并把它吃掉
# 我做了包子19号
# alex拿到包子包子12号并把它吃掉
# 太白拿到包子包子13号并把它吃掉
# alex拿到包子包子14号并把它吃掉
# 太白拿到包子包子15号并把它吃掉
# alex拿到包子包子16号并把它吃掉
# 太白拿到包子包子17号并把它吃掉
# alex拿到包子包子18号并把它吃掉
# 太白拿到包子包子19号并把它吃掉
# alex拿到包子None号并把它吃掉
# 太白拿到包子None号并把它吃掉

# 注意一开始为什么要生产8个包子才开始吃?
# 因为是本来这个队列q = Queue(5)限定了只能放6个包子进去
# 但是生产者一旦把6个包子放进去,两个消费者就分别从里面取出一个包子
# 只不过,重点是消费者要2s后才能继续拿包子吃,但他们已经把两个包子拿出来了
# 所以生产者还得生产2个包子放进去,把队列排满
# 运行发现,中间之所以吃了一个才会生产一个,是因为队列已经满了

 

# 前面所有的示例, 都是主进程永远不会结束
# 原因是:生产者p在生产完后就结束了
# 但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步
# 解决办法: 主进程里发结束信号,但主进程需要等生产者结束后再发送该信号

from multiprocessing import Queue, Process
import time

def consumer(name, q):
    while 1:
        food = q.get()
        if not food: break
        time.sleep(2)
        print("%s拿到了%s号并开始吃它" % (name, food))


def producer(q, food_name):
    for i in range(20):  # 生产20个包子
        time.sleep(0.1)
        food = "%s%s" % (food_name, i)
        print("我做了%s号" % food)
        q.put(food)  # 如果队列满了就在这里阻塞,直到队列里有空位置

if __name__ == "__main__":
    q = Queue(5)  # 这个队列的容量,这里是指 6个的容量
    p = Process(target=consumer, args=("alex", q))  # 消费者1号
    p1 = Process(target=consumer, args=("太白", q))  # 消费者2号
    p2 = Process(target=producer, args=(q, "包子"))  # 生产者1号
    p.start()
    p2.start()
    p1.start()
    p2.join()  # 确认生产者把所需生产数量都生产完毕
    q.put(None)
    q.put(None)

# 运行结果和上面一样,不同的是程序会结束

# 吃的人是消费数据的
# 制造吃的人是生产数据的
# 生产快,消费慢,供过于求
#   增加消费者
# 生产慢,消费快,供不应求
#   增加生产者

 

# 控制内存

import queue

q = queue.Queue(5)  # 设置一个容量值,保护内存
q.put(1)
print("1 over")
q.put(2)
print("2 over")
q.put(3)
print("3 over")
q.put(4)
print("4 over")
q.put(5)
print("5 over")
q.put(6)
print("6 over")
# 1 over
# 2 over
# 3 over
# 4 over
# 5 over

# 运行发现程序不会停,因为它还在等,等设置的内存里减少东西再执行q.put(6)

 











以上是关于生产者+消费者模型控制内存的主要内容,如果未能解决你的问题,请参考以下文章

一个生产者+多消费者模型中的同步,共享内存

13.1 多线程操作共享内存生产者消费者模型多线程服务器框架

35 守护进程 互斥锁 IPC 共享内存 的方式 生产者消费者模型

生产者和消费者模型

并发无锁队列学习(单生产者单消费者模型)

生产者消费者模型