进程中的生产者消费者模型

Posted whylinux

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了进程中的生产者消费者模型相关的知识,希望对你有一定的参考价值。

# 多进程的生产者消费者模型

# 队列
    # 队列是进程安全的,同时只能有一个进程从队列中取到数据

# 生产者消费者模型
    # 为什么要这个模型
        # 这个模型经常性的解决数据的供需不平衡的问题
    # 经常有两拨人,一拨是生产数据的,一拨是消费数据的。
        # 消费者指的是使用数据处理数据的一端
        # 生产数据的一端生产的数据过快
        # 当生产数据过快时,消费数据过慢时,可以弄出一个进程队列,将生产的数据放入到队列中,消费数据端可以开多个进程从这个进程队列中消费数据
        # 当生产数据过慢时,消费数据过快时,可以弄出一个进程队列,多几个进程作为生产者,将生产的数据放到队列中,消费者从队列中消费数据



# 生产者 消费者 模型

# 生产者,每一个生产者就是一个进程,每一个消费者就是一个进程

# 下面的例子,当生产者生产的东西全部生产完了以后,消费者进程还一直阻塞在队列那里等待数据,让消费者进程结束使用的方法是,在阻塞等待生产者执行后了后,向队列中插入了
# None值,消费者判断队列中的值为none后,则退出消费者进程。但是因为进程队列是进程安全的,多个进程一个时间点上只能有一个进程获取到队列中的数据,所以当有一个
# 消费者拿到队列中的None后,队列中就没有了None,其他消费者就拿不到None无法退出消费者进程,必须要有多个消费者就要向队列中丢多少
# 个None才能让消费者进程全部结束
# import time
# import random
# from multiprocessing import Process, Queue
#
#
# def producer(name, food, q):
#     ‘‘‘
#     生产者
#     name:谁生产
#     food:生产了什么东西
#     q:生产出来的东西放到哪里
#     :return:
#     ‘‘‘
#     for i in range(4):
#         time.sleep(random.randint(1, 3))    # 模拟生产数据的时间
#         f = (‘%s 生产了%s %s‘ % (name, food, i))   # 模拟生产数据
#         print(f)    # 查看生产的数据
#         q.put(f)    # 将生产的数据放入到进程队列中
#
# def consumer(q, name):
#     ‘‘‘
#     消费者
#     q:数据在哪里
#     name:谁来消费
#     :return:
#     ‘‘‘
#     while 1:
#         food = q.get()
#         if food is None:
#             print(‘%s 获取到了一个空‘ % name)
#             break
#         print(‘33[31m%s 消费了 %s33[0m‘ % (name, food))
#         time.sleep(random.randint(1, 3))
#
# if __name__ == ‘__main__‘:
#     q = Queue(20)
#     p = Process(target=producer, args=(‘Egon‘, ‘包子‘, q))
#     p.start()
#
#     p1 = Process(target=producer, args=(‘wusir‘, ‘泔水‘, q))
#     p1.start()
#
#     p2 = Process(target=consumer, args=(q, ‘why‘))
#     p2.start()
#
#     p3 = Process(target=consumer, args=(q, ‘Jbox‘))
#     p3.start()
#
#     p.join()   # 阻塞等待生产者1进程结束
#     p1.join()   # 阻塞等待生产者2进程结束
#
#     q.put(None)
#     q.put(None)


# 下面的例子使用JoinableQueue进程队列实现生产者消费者模型,放入数据到队列中时,内部多了一个计数+的机制,从队列中取数据后,调用队列的task_done方法,将计数-1
# 然后在生产者中使用队列的join阻塞等待队列的数据全部为空,全部被消费者消费掉,如果数据全被消耗掉了,生产者进程才会结束

import time
import random
from multiprocessing import Process, JoinableQueue

def producer(name, food, q):
    ‘‘‘
    生产者
    name:谁生产
    food:生产了什么东西
    q:生产出来的东西放到哪里
    :return:
    ‘‘‘
    for i in range(4):
        time.sleep(random.randint(1, 3))    # 模拟生产数据的时间
        f = (%s 生产了%s %s % (name, food, i))   # 模拟生产数据
        print(f)    # 查看生产的数据
        q.put(f)    # 将生产的数据放入到进程队列中,每放入一个数据到队列中后,机制中的count就会被+1
    q.join()    # 阻塞等待队列中的数据全部被取走,q队列中的数据全部被取走则不再阻塞在这里

def consumer(q, name):
    ‘‘‘
    消费者
    q:数据在哪里
    name:谁来消费
    :return:
    ‘‘‘
    while 1:
        food = q.get()
        if food is None:
            print(%s 获取到了一个空 % name)
            break
        print(33[31m%s 消费了 %s33[0m % (name, food))
        time.sleep(random.randint(1, 3))
        q.task_done()   # 每从队列中消费了一个数据后,让机制中的conut计数减一

if __name__ == __main__:
    q = JoinableQueue(20)
    p = Process(target=producer, args=(Egon, 包子, q))
    p.start()

    p1 = Process(target=producer, args=(wusir, 泔水, q))
    p1.start()

    p2 = Process(target=consumer, args=(q, why))
    p2.daemon = True    # 设置为守护进程的目的是为了让主进程的代码执行完毕后,该守护进程的代码自动结束
    p2.start()

    p3 = Process(target=consumer, args=(q, Jbox))
    p3.daemon = True    # 设置为守护进程的目的是为了让主进程的代码执行完毕后,该守护进程的代码自动结束
    p3.start()

    p.join()   # 阻塞等待生产者1进程结束
    p1.join()   # 阻塞等待生产者2进程结束

    print(生产者进程结束,同时主进程要结束,因为其他两个消费者进程被设置为守护进程,所以主进程结束后,那两个消费者守护进程也会结束)

 

以上是关于进程中的生产者消费者模型的主要内容,如果未能解决你的问题,请参考以下文章

进程锁-生产者消费者模型

进程中的生产者消费者模型

守护进程,锁,队列(生产者消费者模型)

20181229(守护进程,互斥锁,IPC,生产者和消费者模型)

35 守护进程 互斥锁 IPC 共享内存 的方式 生产者消费者模型

生产者消费者模型-Java代码实现