网络编程基础---并发编程----Manager(共享字典,列表)---joinabelqueue----进程池---回调函数
Posted Python & more
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了网络编程基础---并发编程----Manager(共享字典,列表)---joinabelqueue----进程池---回调函数相关的知识,希望对你有一定的参考价值。
生产者消费者模型: 1 程序中有两类角色 一类负责生产数据(生产者) 一类负责处理数据(消费者) 2 引入生产者消费者模型的目的: 平衡生产者 与 消费者之间的 速度差 3 如何实现: 生产者---队列----消费者 (解耦合)
1 joinablequeue:
from multiprocessing import Process,JoinableQueue import time,random def producer(name,q,food): for i in range(1,10): time.sleep(0.2) res=‘%s 制作的第%s %s‘%(name,i,food) q.put(res) q.join() # 等到队列里面没有内容 def consumer(name,q): while True: res=q.get() if not res:break print(‘%s 吃了 %s‘%(name,res)) q.task_done() # 取一次 次数减一 if __name__==‘__main__‘: q=JoinableQueue() p1=Process(target=producer,args=(‘egon‘,q,‘baozi‘)) p2=Process(target=producer,args=(‘alex‘,q,‘baozi‘)) p3=Process(target=producer,args=(‘elen‘,q,‘baozi‘)) c1=Process(target=consumer,args=(‘a‘,q)) #共享的q c2=Process(target=consumer,args=(‘b‘,q)) #共享的q p1.start() p2.start() p3.start() c1.daemon=True c2.daemon=True # 消费者随着生产者执行结束 守护进程--- c1.start() c2.start() p1.join() p2.join() p3.join()
2 .Manager:
from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: d[‘count‘]-=1 if __name__==‘__main__‘:
m=Manager() d=m.dict({‘count‘:50}) # Manager 创建共享字典 # d=m.list() lock=Lock() p_l=[] for i in range(20): p=Process(target=work,args=(d,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(d)
3.同步异步 --- 阻塞非阻塞:
同步调用---指的是提交任务的方式 ==== apply 提交晚任务 原地等待任务结束
阻塞(进程的一种状态)---遇到IO就阻塞--剥夺cpu执行权限 异步调用---提交完任务后 不会在原地等待 会继续提交下一个任务
非阻塞(进程处于 运行状态 或者 就绪状态)
4.进程池:
===================== 进程池 ================== 并发量不高-----大并发下不能使用 from multiprocessing import Pool # 开进程 控制进程的数量 import time,os,random def work(n): print(‘%s i working ‘%os.getpid()) time.sleep(random.randint(1,2)) # 阻塞时间 return n if __name__==‘__main__‘: p=Pool(4) # 四个进程 进程池 一共只有四个进程--- 一个完成任务 接着 去做另一个任务 obj_ls=[] for i in range(10): # res=p.apply(work,args=(i,)) # 同步调用 等待任务结束 拿到结果---- 提交启动进程任务 p= Process(target=work) -- p.start() # print(res) obj=p.apply_async(work,args=(i,)) # 异步调用---只不断的提交任务到 进程池 开进程 ----并不拿结果 obj_ls.append(obj) # print(obj.get()) # 等待结果 p.close() # 关闭apply_async请求 p.join() # 等待进程池结束 for obj in obj_ls: print(obj.get())
5 回调函数:
import requests import os from multiprocessing import Pool,Process
def get(url): print(‘%s get %s‘%(os.getpid(),url)) response = requests.get(url) if response.status_code==200: return {‘url‘:url,‘text‘:response.text} def parse(data): print(os.getpid(),data) res=‘%s :%s\n‘ %(data[‘url‘],len(data[‘text‘])) with open(‘demo.txt‘,‘a‘) as f: f.write(res) if __name__==‘__main__‘: urls=[‘https://www.baidu.com‘, ‘https://www.hao123.com‘, ‘http://cn.bing.com/?mkt=zh-CN&mkt=zh-CN&mkt=zh-CN&mkt=zh-CN&mkt=zh-CN&mkt=zh-CN‘, ] p=Pool() url_ls=[] for url in urls: url_ls.append(p.apply_async(get,args=(url,),callback=parse)) # 主进程 负责回调函数 # get函数的返回值-->> 作为parse函数的参数 p.close() p.join()
以上是关于网络编程基础---并发编程----Manager(共享字典,列表)---joinabelqueue----进程池---回调函数的主要内容,如果未能解决你的问题,请参考以下文章