网络编程基础---并发编程----Manager(共享字典,列表)---joinabelqueue----进程池---回调函数

Posted Python & more

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了网络编程基础---并发编程----Manager(共享字典,列表)---joinabelqueue----进程池---回调函数相关的知识,希望对你有一定的参考价值。

生产者消费者模型:

1 程序中有两类角色
     一类负责生产数据(生产者) 一类负责处理数据(消费者)

2 引入生产者消费者模型的目的:
    平衡生产者 与 消费者之间的 速度差

3 如何实现:
    生产者---队列----消费者  (解耦合)

 

1  joinablequeue:

from multiprocessing import Process,JoinableQueue
import time,random

def producer(name,q,food):
    for i in range(1,10):
        time.sleep(0.2)
        res=%s 制作的第%s %s%(name,i,food)
        q.put(res)
    q.join()   # 等到队列里面没有内容
def consumer(name,q):
    while True:
        res=q.get()
        if not res:break
        print(%s 吃了 %s%(name,res))
        q.task_done()  #  取一次 次数减一

if __name__==__main__:
    q=JoinableQueue()
    p1=Process(target=producer,args=(egon,q,baozi))
    p2=Process(target=producer,args=(alex,q,baozi))
    p3=Process(target=producer,args=(elen,q,baozi))
    c1=Process(target=consumer,args=(a,q))  #共享的q
    c2=Process(target=consumer,args=(b,q))  #共享的q
    p1.start()
    p2.start()
    p3.start()

    c1.daemon=True
    c2.daemon=True  # 消费者随着生产者执行结束   守护进程---

    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

 

2 .Manager:

 

from multiprocessing import Manager,Process,Lock

def work(d,lock):
    with lock:
        d[count]-=1

if __name__==__main__:
m
=Manager() d=m.dict({count:50}) # Manager 创建共享字典 # d=m.list() lock=Lock() p_l=[] for i in range(20): p=Process(target=work,args=(d,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(d)

 

 

 

3.同步异步 --- 阻塞非阻塞:

同步调用---指的是提交任务的方式 ==== apply 提交晚任务 原地等待任务结束
阻塞(进程的一种状态)
---遇到IO就阻塞--剥夺cpu执行权限 异步调用---提交完任务后 不会在原地等待 会继续提交下一个任务
非阻塞(进程处于 运行状态 或者 就绪状态)

 

4.进程池:

              ===================== 进程池 ==================

                        并发量不高-----大并发下不能使用

from multiprocessing import Pool       # 开进程 控制进程的数量
import time,os,random

def work(n):
    print(%s i working %os.getpid())
    time.sleep(random.randint(1,2))  #  阻塞时间
    return n

if __name__==__main__:

    p=Pool(4)   # 四个进程  进程池   一共只有四个进程--- 一个完成任务 接着 去做另一个任务
    obj_ls=[]

    for i in range(10):
        # res=p.apply(work,args=(i,))        # 同步调用 等待任务结束 拿到结果---- 提交启动进程任务 p= Process(target=work) -- p.start()
        # print(res)
        obj=p.apply_async(work,args=(i,))    # 异步调用---只不断的提交任务到  进程池 开进程 ----并不拿结果
        obj_ls.append(obj)
        # print(obj.get()) # 等待结果

    p.close()  # 关闭apply_async请求
    p.join()  # 等待进程池结束

    for obj in obj_ls:
        print(obj.get())

 

5  回调函数:

 

import requests
import os
from multiprocessing import Pool,Process
def get(url): print(%s get %s%(os.getpid(),url)) response = requests.get(url) if response.status_code==200: return {url:url,text:response.text} def parse(data): print(os.getpid(),data) res=%s :%s\n %(data[url],len(data[text])) with open(demo.txt,a) as f: f.write(res) if __name__==__main__: urls=[https://www.baidu.com, https://www.hao123.com, http://cn.bing.com/?mkt=zh-CN&mkt=zh-CN&mkt=zh-CN&mkt=zh-CN&mkt=zh-CN&mkt=zh-CN, ] p=Pool() url_ls=[] for url in urls: url_ls.append(p.apply_async(get,args=(url,),callback=parse)) # 主进程 负责回调函数 # get函数的返回值-->> 作为parse函数的参数 p.close() p.join()

 

以上是关于网络编程基础---并发编程----Manager(共享字典,列表)---joinabelqueue----进程池---回调函数的主要内容,如果未能解决你的问题,请参考以下文章

并发编程的基础

掌握系列之并发编程-1.并发基础

高并发编程必备基础 -- 转载自 并发编程网

编程内功心法「多线程并发编程」技术体系和并发模型的基础探究(夯实基础)

并发编程之基础( 四)

并发编程基础