生产者消费者模型及队列,进程池

Posted hdy19951010

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了生产者消费者模型及队列,进程池相关的知识,希望对你有一定的参考价值。

 

生产者消费者模型

生产者消费者模型
主要是为了解耦
可以借助队列来实现生产者消费者模型

栈 : 先进后出(First In Last Out 简称 FILO)
队列 : 先进先出(First In First Out 简称 FIFO)

import queue #不能进行多进程之间的数据传输
(1) from multiprocessing import Queue #借助Queue解决生产者消费者模型,队列是安全的
q=Queue(num)
num : 队列的最大长度
q.get() #阻塞等待获取数据,如果有数据就直接获取,如果没有数据就阻塞等待
q.put() #阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待
q.get nowait() #不阻塞,如果有数据直接获取,没有数据就报错
q.put nowait() #不阻塞,如果哦可以继续往队列中放数据,就直接放,不能放就报错

(2) from multiprocessing import JoinableQueue #可连接的队列
JoinableQueue 是继承Queue,所以可以使用Queue中的方法
q.join() #用于生产者.等待 q.task done的返回结果,通过返回结果,生产者就能获得消费者消费了多少个数据
q.task done() #用于消费者,是指每消费队列中一个数据,就给join返回一个标识

回调函数的使用:
进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的操作
回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数
from multiprocessing import Queue,Process
import time
# 消费者
def con(q,name):
    while 1:
        info=q.get()    #消费
        if info:    #如果有就打印,否则break
            print(%s拿走了%s%(name,info))
        else:
            break
# 生产者
def sh(q,product):
    for i in range(10):
        info=product+版娃娃%s号% str(i)
        print(info)
        q.put(info)     #生产
    q.put(None)

if __name__==__main__:
    q=Queue(10)     #队列长度10(可以自己设定)
    p=Process(target=sh,args=(q,大雄))
    p_1=Process(target=con,args=(q,alex))
    p.start()       #执行子进程
    p_1.start()     #执行子进程

队列实现 生产者消费者模型
模块:from multiprocessing import Queue,Process
from multiprocessing import Queue,Process
def xiao(q,name,color):     # 这里的 color 是颜色的传参
    while 1:
        ret=q.get()     # 消费 q.get()
        if ret:
            print(%s%s拿走了%s娃娃33[0m% (color,name,ret))   # color接收的是颜色的传参而且是开头,所以color要放在前面
        else:
            break       #当消费者在数据队列中拿到None的时候,就是拿到了生产者不再生产数据的标识,此时消费者结束消费即可

def sheng(q,ban):
    for i in range(0,12):
        ret=ban+版娃娃第%s号% str(i)
        # print(ret)
        q.put(ret)      # 生产  q.put(变量)

if __name__==__main__:
    q=Queue(15)         #队列长度
    p2 = Process(target=sheng, args=(q, 小熊))      #开启生产者子进程
    p1=Process(target=xiao,args=(q,ko,33[31m))
    p1_1=Process(target=xiao,args=(q,lp,33[33m))      #这里的转换颜色虽然传的时候是在最后面,但是这是颜色的开头,在打印的时候需要放在前面
    p_p=[p1_1,p1,p2]
    [i.start() for i in p_p]        #让两个消费者轮流消费
    p2.join()   #主进程阻塞等待生产子进程执行完后(生产完)再继续向下执行
    q.put(None)     #几个消费者就要接收几个结束标识
    q.put(None)

 

进程间共享内存
主进程的值与子进程的值是一样的
用法: m=Manager()
num = m.dict({键:值})
num = m.list([1,2,3])
from multiprocessing import Process,Manager,Value
def func(num):
    num[0]-=1
    print(子进程中的num值是,num)

if __name__==__main__:
    m=Manager()
    num=m.list([1,2,3])     #共享内存,所以主进程的值与子进程的值是一样的
    p=Process(target=func,args=(num,))
    p.start()
    p.join()
    print(主进程中的num值是,num)

 

 

进程池
进程池的三个方法
(1) map(func,iterable)
func : 进程池中的进程执行的任务函数
iterable : 可迭代对象,是把可迭代对象中的每个元素依次传给任务函数当参数
(2) apply(func,args=()): 同步的效率,也就是说池中的进程一个一个的去执行任务
func : 进程池中的进程执行的任务函数
args : 可迭代对象型的参数,是传给任务函数的参数
同步处理任务时,不需要close和join
同步处理任务时,进程池中的所有进程是普通进程(主进程需要等待其执行结束)

(3) apply_async(func,args=(),callback=None): 异步的效率,也就是说池中的进程一次性都去执行任务
func : 进程池中的进程执行的任务函数
args : 可迭代对象型的参数,是传给任务函数的参数
callback : 回调函数,就是说每当进程池中有进程处理完任务了,返回的结果可以交给回调函数,由回调函数进行进一步处理,
回调函数只有异步才有同步是没有的, 异步处理任务时,进程池中的所有进程是守护进程(主进程代码执行完毕守护进程就结束)
异步处理任务时,必须要加上close和join


map返回值:
from multiprocessing import Pool
def func(num):
    num +=1
    print(num)
    return num

if __name__==__main__:
    p=Pool()
    res=p.map(func,[i for i in range(10)])
    p.close()
    p.join()
    print(主进程中的map返回值,res)

 

进程池异步处理问题(异步:开启多个进程,并且同时处理多个任务)
from multiprocessing import Pool
import time

def func(num):
    num += 1
    return num

if __name__ == __main__:
    p = Pool(5)         #设置进程数(最好是比自己电脑核数多一个)
    start = time.time()
    l = []
    for i in range(10000):
        res = p.apply_async(func,args=(i,))# 异步处理这100个任务,异步是指,进程中有5个进程,一下就处理5个任务,接下来哪个进程处理完任务了,就马上去接收下一个任务
        l.append(res)
    p.close()
    p.join()
    print(time.time() - start)

 

进程池同步处理任务(同步:虽然有多个进程,但是还是一个进程一个进程的去处理)
from multiprocessing import Pool
import time

def func(num):
    num += 1
    return num
if __name__==__main__:
    p = Pool(5)     #开5个进程
    start = time.time()   #开启进程前记下时间
    l=[]
    for i in range(10000):
        res = p.apply(func,args=(i,))   #同步处理任务,虽然有五个进程,但是依然一个进程一个进程的去处理任务
        l.append(res)     #把10000个数放进列表
    print(l)
    print(time.time()-start)      #进程结束的时间减去开启进程前的时间

 

同步和异步的效率的对比
from multiprocessing import Pool
import requests
import time

def func(url):
    res = requests.get(url)
    print(res.text)
    if res.status_code == 200:
        return ok

if __name__ == __main__:
    p = Pool(5)
    l = [https://www.baidu.com,
         http://www.jd.com,
         http://www.taobao.com,
         http://www.mi.com,
         http://www.cnblogs.com,
         https://www.bilibili.com,
         ]
    start = time.time()
    for i in l:
        p.apply(func,args=(i,))

    apply_= time.time() - start

    start = time.time()
    for i in l:
        p.apply_async(func, args=(i,))
    p.close()
    p.join()
    print(同步的时间是%s,异步的时间是%s%(apply_, time.time() - start))

 

















































以上是关于生产者消费者模型及队列,进程池的主要内容,如果未能解决你的问题,请参考以下文章

joinablequeue模块 生产者消费者模型 Manager模块 进程池 管道

Day9 进程同步锁 进程队列 进程池 生产消费模型

Python学习笔记——进阶篇第九周———线程进程协程篇(队列Queue和生产者消费者模型)

Linux入门多线程(线程概念生产者消费者模型消息队列线程池)万字解说

Linux入门多线程(线程概念生产者消费者模型消息队列线程池)万字解说

Linux入门多线程(线程概念生产者消费者模型消息队列线程池)万字解说