并发编程
Posted liuhongshuai
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程相关的知识,希望对你有一定的参考价值。
1,简单例子
# import os # import time # print(os.getpid())#当前进程 # print(os.getppid())#父进程 # import os # import time # from multiprocessing import Process # def func(money): # time.sleep(1) # print(‘取钱{}元‘.format(money)) # # if __name__=="__main__": # p=Process(target=func,args=(10,))#创建一个进程对象 # p.start()#进程开启 # print(‘===============‘) # p.join()#阻塞 # print(‘****************‘) #主进程与子进程是异步执行 #如果在主进程结束了 子进程未结束 主进程会等待着子进程
2,开启多个子进程
#开启多个子进程 # import os # import time # from multiprocessing import Process # def func(i): # time.sleep(1) # print(‘{}:子进程{},父进程{}‘.format(i,os.getpid(),os.getppid())) # # if __name__==‘__main__‘: # for i in range(10): # p=Process(target=func,args=(i,)) # p.start() # # p.join()#阻塞 让主进程等待子进程结束后执行 # print(‘=======主进程================‘)
3,自定义进程
#自定义进程 基于继承 必须实现run方法 # import os # from multiprocessing import Process # class MyProcess(Process): # def __init__(self,arg1,arg2): # super().__init__() # self.arg1=arg1 # self.arg2=arg2 # def run(self): # print(‘子进程{},{}-{}‘.format(os.getpid(),self.arg1,self.arg2)) # self.walk()#在子进程中调用 # def walk(self): # print(‘子进程{}‘.format(os.getpid())) # # if __name__==‘__main__‘: # p=MyProcess(1,2) # p.start()#run方法 # p.join() # p.walk()#直接在主进程中调用,并没有在子进程中执行 # print(‘主进程‘,os.getpid())
4,数据隔离
#数据隔离 进程与进程之间的数据是共享的 # from multiprocessing import Process # n=100 # def func(): # global n # n=n-1 # print(n) # # if __name__==‘__main__‘: # for i in range(10): # p=Process(target=func) # p.start()#结果均为99 # p.join() # print(‘主进程:‘,n)#100
5,守护进程
#守护进程 p.daemon=True #守护进程会随着主进程的结束而结束 #守护进程要在start前设置 守护进程中不要再开子进程 # import time # from multiprocessing import Process # # def func1(): # print(‘func1 start‘) # time.sleep(5) # print(‘func1 end‘) # def func2(): # print(‘func2 start‘) # time.sleep(5) # print(‘func2 end‘) # # if __name__=="__main__": # p1=Process(target=func1) # p1.daemon=True # p1.start()#p1随主进程结束而结束 # p2=Process(target=func2) # p2.start() # time.sleep(2) # print(‘=======主进程==========‘)
6,进程的其他方法和属性
#进程的其他属性和方法 pid name is_alive() terminate # import time # from multiprocessing import Process # def func(): # time.sleep(2) # print(‘Hello World‘) # # if __name__=="__main__": # p=Process(target=func) # p.start() # print(p.pid)#进程id # print(p.name)#进程名字 # p.name=‘进程1‘#修改进程名字 # print(p.name) # # print(p.is_alive())#进程是否活跃 # p.terminate()#结束进程 但不会立刻被杀死 # print(p.is_alive()) # time.sleep(1) # print(p.is_alive())
7,锁
#锁 在并发编程中 保证数据安全 # from multiprocessing import Lock # lock=Lock() # lock.acquire() # lock.release() #抢票实例 # import json # import time # import random # from multiprocessing import Process # from multiprocessing import Lock # # def search(i): # with open(‘ticket‘,‘r‘,encoding=‘utf-8‘) as f: # print(i,json.load(f)[‘count‘]) # # def get(i): # with open(‘ticket‘,‘r‘,encoding=‘utf-8‘) as f: # ticket_num=json.load(f)[‘count‘] # time.sleep(random.random()) # if ticket_num>0: # with open(‘ticket‘,‘w‘,encoding=‘utf-8‘) as f: # json.dump({‘count‘:ticket_num-1},f) # print(‘{}号抢到票了‘.format(i)) # else: # print(‘{}号没票了‘.format(i)) # # def task(i,lock): # search(i)#查看票 # lock.acquire() # get(i)#抢票 # lock.release() # # if __name__==‘__main__‘: # lock=Lock() # for i in range(20):#20个人同时抢票 # p=Process(target=task,args=(i,lock)) # p.start()
8,信号量
#信号量 多把钥匙公用一把锁 # from multiprocessing import Semaphore # sem=Semaphore(4) # sem.acquire() # sem.release() #歌厅实例 # import time # import random # from multiprocessing import Process # from multiprocessing import Semaphore # def sing(i,sem): # sem.acquire() # print(‘{}进入ktv‘.format(i)) # time.sleep(random.random()) # print(‘{}出了ktv‘.format(i)) # sem.release() # #迷你歌厅 同一时间只能有4个人进入 # if __name__=="__main__": # sem=Semaphore(4) # for i in range(20): # p=Process(target=sing,args=(i,sem)) # p.start()
9,事件
# 事件 标志 同时 是所有的进程 都陷入阻塞 # from multiprocessing import Event # e=Event()#实例化一个事件 # e.set()#将标志变为非阻塞 # e.wait()#等待 # e.clear()#将标志变为阻塞 # e.is_set()#判断是否阻塞 #红绿灯实例 # import time # import random # from multiprocessing import Process # from multiprocessing import Event # # def traffic_light(e): # while True: # if e.is_set(): # time.sleep(3) # print(‘红灯亮‘) # e.clear()#红变绿 # else: # time.sleep(3) # print(‘绿灯亮‘) # e.set()#绿变红 # # def car(i,e): # e.wait() # print(‘{}车通过‘.format(i)) # # if __name__=="__main__": # e=Event()#立一个红灯 # tra=Process(target=traffic_light,args=(e,)) # tra.start()#启动一个进程来控制红绿灯 # for i in range(20): # if i%6==0: # time.sleep(random.randint(1,3)) # car_pro=Process(target=car,args=(i,e)) # car_pro.start()
10,队列
#队列 # from multiprocessing import Queue # q=Queue(3)#有长度限制 # # q=Queue()#没有长度限制 # q.put(1) # q.put(2) # print(q.qsize())#有多少值 # q.put(3) # # q.put(4)#阻塞 # print(q.get()) # print(q.get()) # print(q.get()) # # print(q.get())#阻塞 #利用队列实现了 主进程与子进程的通信 子进程之间的通信 # from multiprocessing import Process # from multiprocessing import Queue # def q_put(q): # q.put(‘Hello World‘) # def q_get(q): # print(q.get()) # # if __name__=="__main__": # q=Queue() # p=Process(target=q_put,args=(q,)) # p.start() # p1=Process(target=q_get,args=(q,)) # p1.start()
11,生产者消费者模型
#生产者消费者模型 # from multiprocessing import Process # from multiprocessing import Queue # import random # import time # # def producer(q,food): # for i in range(5): # q.put(‘{}-{}‘.format(food,i)) # print(‘生产了{}-{}‘.format(food,i)) # time.sleep(random.random()) # q.put(None) # q.put(None) # q.put(None) # # def consumer(q,name): # while True: # food=q.get() # if food==None:break # print(‘{}吃了{}‘.format(name,food)) # # if __name__==‘__main__‘: # q=Queue() # p1=Process(target=producer,args=(q,‘包子‘)) # p1.start() # p2=Process(target=producer,args=(q,‘骨头‘)) # p2.start() # c1=Process(target=consumer,args=(q,‘alex‘)) # c1.start() # c2=Process(target=consumer,args=(q,‘wusir‘)) # c2.start()
# JoinableQueue版 (可以感知数据的处理 task_done) # from multiprocessing import Process # from multiprocessing import JoinableQueue # import random # import time # # def producer(q,food): # for i in range(5): # q.put(‘{}-{}‘.format(food,i)) # print(‘生产了{}-{}‘.format(food,i)) # time.sleep(random.random()) # q.join()#等待消费者把所有数据处理完 # # def consumer(q,name): # while True: # food=q.get() # print(‘{}吃了{}‘.format(name,food)) # q.task_done()#消费完了 # # # if __name__==‘__main__‘: # q=JoinableQueue() # p1=Process(target=producer,args=(q,‘包子‘)) # p1.start() # p2=Process(target=producer,args=(q,‘骨头‘)) # p2.start() # c1=Process(target=consumer,args=(q,‘alex‘)) # c1.daemon=True # c1.start() # c2=Process(target=consumer,args=(q,‘wusir‘)) # c2.daemon=True # c2.start() # # p1.join()#等待p1执行完毕 # p2.join()#等待p2执行完毕 #生产者生产的数据全部被消费 —— 生产者进程结束 —— 主进程代码执行结束 —— 消费者守护进程结束
# IPC机制 队列Quere # 管道 Pipe 双向通信 # from multiprocessing import Pipe # p1,p2=Pipe()#支持双向通信 # p1.send(‘hello‘) # print(p2.recv()) # p2.send(‘hi‘) # print(p1.recv()) # p1.close() # p2.close() #EOFerror错误:双向通信的一端关闭 #队列=管道(没有锁的机制,数据不安全)+锁 #队列 在同一台机器上的多个进程之间通信 #利用管道实现生产者消费者模型 # from multiprocessing import Lock # from multiprocessing import Process # from multiprocessing import Pipe # # def producer(p,n): # produce,consume=p # consume.close() # for i in range(n): # produce.send(i) # produce.send(None) # produce.send(None) # produce.close() # # def consumer(p,name,lock): # produce,consume=p # produce.close() # while True: # lock.acquire() # food=consume.recv() # lock.release() # if food: # print(‘{}收到包子:{}‘.format(name,food)) # else: # consume.close() # break # # if __name__==‘__main__‘: # produce,consume=Pipe() # lock=Lock() # p1=Process(target=producer,args=((produce,consume),10)) # c1=Process(target=consumer,args=((produce,consume),‘c1‘,lock)) # c2=Process(target=consumer,args=((produce,consume),‘c2‘,lock)) # # c1.start() # c2.start() # p1.start() # # produce.close() # consume.close() # # c1.join() # c2.join() # p1.join()
12,Manager
# IPC-Manager # import time # from multiprocessing import Manager # from multiprocessing import Process # # def func(dic): # print(dic) # # if __name__==‘__main__‘: # m=Manager() # d=m.dict({‘count‘:0}) # print(d) # p=Process(target=func,args=(d,)) # p.start() # Manager : dict list pipe ,并不提供数据安全的支持 # from multiprocessing import Manager,Process,Lock # def work(d,lock): # lock.acquire() # d[‘count‘]-=1 # lock.release() # # if __name__==‘__main__‘: # lock=Lock() # m=Manager() # dic=m.dict({‘count‘:100})#共享的数据 # l=[] # for i in range(10): # p=Process(target=work,args=(dic,lock)) # p.start() # l.append(p) # [p.join() for p in l] # print(dic)
13,进程池
#进程池 # import os # import random # from multiprocessing import Process # from multiprocessing import Pool # import time # def func(i): # i+=1 # print(i) # # if __name__==‘__main__‘: # p=Pool(5)#创建了五个进程 # start_time=time.time() # p.map(func,range(100))#target=func,args=next(iterable) # p.close()#不允许再向进程池中添加任务 # p.join() # end_time=time.time() # print(end_time-start_time) #进程池2 # import time # from multiprocessing import Pool # # def func(i): # time.sleep(1) # i+=1 # # print(i) # return i+1 # # if __name__==‘__main__‘: # p=Pool(5) # for i in range(20): # res = p.apply(func, args=(i,)) # 同步提交结果 顺序执行代码 直接调用之后得到返回值 # print(res) # res_l=[] # for i in range(20): # res=p.apply_async(func,args=(i,))#异步提交任务的机制 # res_l.append(res) # # print(res.get())#阻塞:等待任务结果 # p.close()#close必须加在join前,不允许再添加新的任务了 # p.join()#等待子进程结束在向下执行 # [print(i.get()) for i in res_l]#异步调用获取函数的返回值
14,回调函数
# callback 回调函数 :主进程执行 参数是子进程执行的函数的返回值 # import os # import time # from multiprocessing import Pool # # def func(i): # print(‘子进程{}:{}‘.format(i,os.getpid())) # return i*"*" # # def call(arg):#回调函数是在主进程中完成的 不能传参数 只能接受多进程函数的返回值 # print(‘回调:‘,os.getpid()) # print(arg) # # if __name__==‘__main__‘: # print(‘---------->‘,os.getpid()) # p=Pool(5) # for i in range(10): # p.apply_async(func,args=(i,),callback=call) # p.close() # p.join() #回调函数应用 # from urllib.request import urlopen # import requests # from multiprocessing import Pool # def get_url(url): # ret = requests.get(url) # return {‘url‘:url, # ‘status_code‘:ret.status_code, # ‘content‘:ret.text} # # def parser(dic): # print(dic[‘url‘],dic[‘status_code‘],len(dic[‘content‘])) # # 把分析结果写到文件里 # if __name__ == ‘__main__‘: # url_l = [ # ‘http://www.baidu.com‘, # ‘http://www.sogou.com‘, # ‘http://www.hao123.com‘, # ‘http://www.yangxiaoer.cc‘, # ‘http://www.python.org‘ # ] # p = Pool(4) # for url in url_l: # p.apply_async(get_url,args=(url,),callback=parser) # p.close() # p.join()
15,线程与协程
# ----------------------------------------------------- # 线程是CPU调度的最小单位 # 进程是资源分配的最小单位 # 与进程相比,开启线程的时空开销小 cpu在线程之间切换快 # 一个程序中 可以同时有多进程和线程 # --------------------------------------------------------------- #线程 # import os # import time # from threading import Thread # # def func(): # time.sleep(1) # print(‘子线程:{}‘.format(os.getpid())) # for i in range(10): # t=Thread(target=func) # t.start() # t.join() # print(‘主线程‘,os.getpid()) #主线程和子线程的进程id相同,属于同一个进程 #自定义线程 # import os # import time # from threading import Thread # # class MyThread(Thread): # count=0#静态属性 # def __init__(self,arg1,arg2): # super().__init__() # self.arg1=arg1 # self.arg2=arg2 # def run(self): # MyThread.count+=1 # time.sleep(1) # print(‘{}-{}-{}-{}‘.format(self.name,os.getpid(),self.arg1,self.arg2)) # # for i in range(10): # t=MyThread(i,i*"*") # t.start() # print(t.count) # import time # import threading # # def func(i): # time.sleep(0.5) # print(i,threading.current_thread().name,threading.current_thread().ident) # #线程名字 线程id # for i in range(10): # t=threading.Thread(target=func,args=(i,)) # t.start() # # print(threading.enumerate())#返回正在运行着的线程列表 # print(len(threading.enumerate()))#11 #主线程+10个子线程 # print(threading.activeCount())#活跃线程数 # ------------------------------------------------------- #守护线程 # import time # from threading import Thread # def func(): # print(‘子线程开始执行‘) # time.sleep(3) # print(‘子线程执行完毕‘) # # t=Thread(target=func) # t.setDaemon(True) # t.start() # print(‘==========‘) # ----------------------------------------------------- #锁 # import time # from threading import Thread # from threading import Lock # # n=100 # lock=Lock() # def func(): # global n # time.sleep(2) # lock.acquire() # temp=n#从进程中获取n # time.sleep(0.01) # n=temp-1#得到结果,在存储回进程 # lock.release() # # # for i in range(5): # t=Thread(target=func) # t.start() # t.join() # print(n) # ----------------------------------------- #死锁 # 科学家就餐实例 有问题 # import time # from threading import RLock#递归锁 # from threading import Lock#互斥锁 # from threading import Thread # m=RLock() # kz=RLock() # # def eat(name): # kz.acquire() # print(‘{}拿到筷子了‘.format(name)) # m.acquire() # print(‘{}拿到面了‘.format(name)) # print(‘{}吃面‘.format(name)) # m.release() # kz.release() # # def eat2(name): # m.acquire() # print(‘{}拿到面了‘.format(name)) # kz.acquire() # print(‘{}拿到筷子了‘.format(name)) # print(‘{}吃面‘.format(name)) # kz.release() # m.release() # # Thread(target=eat,args=(‘alex‘,)).start() # Thread(target=eat2,args=(‘wusir‘,)).start() # Thread(target=eat,args=(‘yuan‘,)).start() # Thread(target=eat2,args=(‘haifeng‘,)).start() # --------------------------------------------------- #信号量 # import time # import random # from threading import Thread # from threading import Semaphore # # def func(n,sem): # sem.acquire() # print(‘thread-{} start‘.format(n)) # time.sleep(random.random()) # print(‘thread-{} end‘.format(n)) # sem.release() # sem=Semaphore(5)#一把锁有五把钥匙 # for i in range(20): # Thread(target=func,args=(i,sem)).start() # ----------------------------------------------------- #事件 # 刚刚创建的时候 flag=False # wait 阻塞 flag=False # set False-->True # clear True-->False #连接数据库实例 # import time # import random # from threading import Event # from threading import Thread # e=Event() # def conn_mysql():#连接数据库 # count=1 # while not e.is_set():#当事件的flag为False时才执行循环内的语句 # if count>3: # raise TimeoutError # print(‘尝试连接第{}次‘.format(count)) # count+=1 # e.wait(0.5)#一直阻塞变成了只阻塞0.5 # print(‘连接成功‘)# 收到check_conn函数内的set指令,让flag变为True跳出while循环,执行本句代码 # # def check_conn(): # ‘‘‘ # 检测数据库服务器的连接是否正常 # ‘‘‘ # time.sleep(random.randint(1,2))# 模拟连接检测的时间 # e.set()# 告诉事件的标志数据库可以连接 # # check=Thread(target=check_conn) # check.start() # conn=Thread(target=conn_mysql) # conn.start() # ----------------------------------------- #条件 # import threading # def run(n): # con.acquire() # con.wait() # print(‘run the thread:{}‘.format(n)) # con.release() # # if __name__==‘__main__‘: # con=threading.Condition()#条件=锁+wait的功能 # for i in range(10): # t=threading.Thread(target=run,args=(i,)) # t.start() # while True: # inp=input(‘>>>‘) # if inp==‘q‘: # break # con.acquire()# condition中的锁 是 递归锁 # if inp==‘all‘: # con.notify_all() # else: # con.notify(int(inp)) # 传递信号 notify(1) --> 可以放行一个线程 # con.release() # ------------------------------------------- #定时器 # from threading import Timer # def hello(): # print(‘Hello World‘) # # while True:#每隔一段时间 开启一个线程 # t=Timer(10,hello)#定时开启一个线程 执行一个任务 # t.start() # --------------------------------------- #队列 安全的 # import queue # q=queue.Queue()#先进先出 # q.put(1) # q.put(2) # q.put(3) # print(q.qsize()) # print(q.get()) # print(q.get()) # print(q.get()) # # # obj=queue.LifoQueue()#后进先出:栈 # obj.put(1) # obj.put(2) # obj.put(3) # obj.put(4) # print(obj.get()) # print(obj.get()) # print(obj.get()) # print(obj.get()) # import queue # pq=queue.PriorityQueue()#值越小越优先 值相同就ascii码小的先出 # pq.put(‘x‘) # pq.put(‘a‘) # pq.put(‘z‘) # print(pq.get()) # print(pq.get()) # -------------------------------------------- # concurrent # import time # import random # from concurrent import futures # # def funcname(n): # print(n) # time.sleep(random.randint(1,3)) # return n*"*" # def call(args): # print(args.result()) # # thread_pool=futures.ThreadPoolExecutor(5) # # thread_pool.map(funcname,range(10))# map,天生异步,接收可迭代对象的数据,不支持返回值 # f_list=[] # for i in range(10): # f=thread_pool.submit(funcname,i)#submit合并了创建线程对象和start的功能 # f_list.append(f) # # thread_pool.shutdown()#close() join() # for f in f_list:# 一定是按照顺序出结果 # print(f.result())#f.result()阻塞 等f执行完得到结果 # # # 回调函数 add_done_callback(回调函数的名字) # thread_pool.submit(funcname,1).add_done_callback(call) # # 统一了入口和方法 简化了操作 降低了学习的时间成本 # --------------------------------------------- #协程介绍 # def func1(): # print(1) # yield # print(3) # yield # # def func2(): # g = func1() # next(g) # print(2) # next(g) # print(4) # # func2() # def consumer(): # while True: # n = yield # print(‘消费了一个包子%s‘%n) # # def producer(): # g = consumer() # next(g) # for i in range(10): # print(‘生产了包子%s‘%i) # g.send(i) # # producer() # import time # from greenlet import greenlet # 在单线程中切换状态的模块 # def eat1(): # print(‘吃鸡腿1‘) # g2.switch() # time.sleep(5) # print(‘吃鸡翅2‘) # g2.switch() # # def eat2(): # print(‘吃饺子1‘) # g1.switch() # time.sleep(3) # print(‘白切鸡‘) # # g1 = greenlet(eat1) # g2 = greenlet(eat2) # g1.switch() # gevent内部封装了greenlet模块 # #串行执行 # import time # def consumer(res): # ‘‘‘任务1:接收数据,处理数据‘‘‘ # pass # # def producer(): # ‘‘‘任务2:生产数据‘‘‘ # res=[] # for i in range(100000000): # res.append(i) # return res # # start=time.time() # #串行执行 # res=producer() # consumer(res) #写成consumer(producer())会降低执行效率 # stop=time.time() # print(stop-start) #1.5536692142486572 # # # # #基于yield并发执行 # import time # def consumer(): # ‘‘‘任务1:接收数据,处理数据‘‘‘ # while True: # x=yield # # def producer(): # ‘‘‘任务2:生产数据‘‘‘ # g=consumer() # next(g) # for i in range(100000000): # g.send(i) # # start=time.time() # #基于yield保存状态,实现两个任务直接来回切换,即并发的效果 # #PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的. # producer() # # stop=time.time() # print(stop-start) # 在代码之间切换执行 反而会降低效率 # 切换 不能规避IO时间 # 如果 在同一个程序中 有IO的情况下 才切换 会让效率提高很多 # yield greenlet 都不能在切换的时候 规避IO时间 # gevent from gevent import monkey;monkey.patch_all() import time # time socket urllib requests import gevent # greenlet gevent在切换程序的基础上又实现了规避IO from threading import current_thread def func1(): print(current_thread().name) print(123) time.sleep(1) print(456) def func2(): print(current_thread().name) # dummythread print(‘hahaha‘) time.sleep(1) print(‘10jq‘) g1 = gevent.spawn(func1) # 遇见他认识的io会自动切换的模块 g2 = gevent.spawn(func2) # g1.join() # g2.join() gevent.joinall([g1,g2]) #效率对比 from gevent import monkey;monkey.patch_all() import time # time socket urllib requests import gevent # greenlet gevent在切换程序的基础上又实现了规避IO def task(args): time.sleep(1) print(args) def sync_func(): # 同步 for i in range(10): task(i) def async_func(): # 异步 g_l = [] for i in range(10): g_l.append(gevent.spawn(task,i)) # 给写成任务传参数 gevent.joinall(g_l) start = time.time() sync_func() print(time.time() - start) start = time.time() async_func() print(time.time() - start) #爬取网页信息的例子 from gevent import monkey;monkey.patch_all() import time import gevent import requests # 爬取网页 # 10个网页 # 协程函数去发起10个网页的爬取任务 def get_url(url): res = requests.get(url) print(url,res.status_code,len(res.text)) url_lst =[ ‘http://www.sohu.com‘, ‘http://www.baidu.com‘, ‘http://www.qq.com‘, ‘http://www.python.org‘, ‘http://www.cnblogs.com‘, ‘http://www.mi.com‘, ‘http://www.apache.org‘, ‘https://www.taobao.com‘, ‘http://www.360.com‘, ‘http://www.7daysinn.cn/‘ ] start = time.time() for url in url_lst: get_url(url) print(time.time() - start) #爬取网页效率 from gevent import monkey;monkey.patch_all() import time import gevent import requests # 爬取网页 # 10个网页 # 协程函数去发起10个网页的爬取任务 def get_url(url): res = requests.get(url) print(url,res.status_code,len(res.text)) url_lst =[ ‘http://www.sohu.com‘, ‘http://www.baidu.com‘, ‘http://www.qq.com‘, ‘http://www.python.org‘, ‘http://www.cnblogs.com‘, ‘http://www.mi.com‘, ‘http://www.apache.org‘, ‘https://www.taobao.com‘, ‘http://www.360.com‘, ‘http://www.7daysinn.cn/‘ ] g_lst = [] start = time.time() for url in url_lst: g = gevent.spawn(get_url,url) g_lst.append(g) gevent.joinall(g_lst) print(time.time() - start) #IO模型 # 概念 # 阻塞 非阻塞 同步 异步 # 阻塞 time.sleep(1) # 异步 同时执行几个事儿 # 同步 两个事儿 一个一个的执行 # 网络IO模型 # 1.阻塞IO # 2.非阻塞IO # 3.IO多路复用 # 4.信号驱动IO # 5.异步IO # 网络IO # recv recvfrom accept requests.get() # send connect sendto # IO的两个阶段 # 数据准备阶段 # 数据copy阶段 # 阻塞IO #主进程的阻塞问题 ,多进程 多线程 分离了阻塞 # 真的解决了这些阻塞么? # 多进程和多线程来说 # 来几个人请求 就要开几个线程 # 进程线程不能无限开 # 池 —— > 4 # 以后用进程都用进程池 单纯的进程池不能满足用户的需求,只适合小并发的问题 # 真正需要我们解决的是I/O问题 # 非阻塞IO
16,其他
#server #多进程应用 # import socket # from multiprocessing import Process # # def talk(conn): # conn.send(b‘connected‘) # res=conn.recv(1024) # print(res) # # if __name__==‘__main__‘: # sk=socket.socket() # sk.bind((‘127.0.0.1‘,8080)) # sk.listen(5) # while True: # conn,addr=sk.accept() # p=Process(target=talk,args=(conn,)) # p.start() # conn.close() # sk.close() #多线程应用 # import socket # from threading import Thread # # def talk(conn): # conn.send(b‘connected‘) # res=conn.recv(1024) # print(res) # conn.close() # # # sk=socket.socket() # sk.bind((‘127.0.0.1‘,8080)) # sk.listen(5) # while True: # conn,addr=sk.accept() # p=Thread(target=talk,args=(conn,)) # p.start() # sk.close() #聊天 # from gevent import monkey # monkey.patch_all() # import gevent # import socket # def talk(conn): # while True: # ret = conn.recv(1024).decode(‘utf-8‘) # print(ret) # conn.send(ret.upper().encode(‘utf-8‘)) # conn.close() # # sk = socket.socket() # sk.bind((‘127.0.0.1‘,8080)) # sk.listen() # while True: # conn,addr = sk.accept() # gevent.spawn(talk,conn) # sk.close() #非阻塞IO # import socket # sk = socket.socket() # sk.bind((‘127.0.0.1‘,8080)) # sk.listen() # sk.setblocking(False) # conn_lst = [] # # while True: # try: # conn,addr = sk.accept() #非阻塞 有链接来 # conn_lst.append(conn) # except BlockingIOError: # del_lst = [] # for c in conn_lst: # 才能执行这一句 # try: # msg = c.recv(10).decode(‘utf-8‘) # recv不会阻塞 # if not msg: # c.close() # del_lst.append(c) # else: # print(msg) # c.send(msg.upper().encode(‘utf-8‘)) # except BlockingIOError: # pass # if del_lst: # for del_item in del_lst: # conn_lst.remove(del_item) #IO多路复用 import socket import select sk = socket.socket() sk.bind((‘127.0.0.1‘,8099)) sk.listen() read_lst = [sk] while True: rl,wl,xl = select.select(read_lst,[],[]) # select阻塞,rl可以读的 wl可以写的 xl可以改的 [sk,conn] for item in rl: if item == sk: conn,addr = item.accept() # 有数据等待着它接收 read_lst.append(conn) else: ret = item.recv(1024).decode(‘utf-8‘) if not ret: item.close() read_lst.remove(item) else: print(ret) item.send((‘received %s‘%ret).encode(‘utf-8‘)) # readlst [sk,conn,conn2,conn3] 100 问一百次 # select poll 随着要检测的数据增加 效率会下降 # select 有数目的限制 # poll 能处理的对象更多 # epoll 能处理多对象 不是使用轮询 回调函数 —— linux #client # import socket # sk = socket.socket() # sk.connect((‘127.0.0.1‘,8080)) # ret = sk.recv(1024) # print(ret) # msg = input(‘>>>‘) # sk.send(msg.encode(‘utf-8‘)) # sk.close #聊天 # import time # import socket # import threading # def my_client(): # sk = socket.socket() # sk.connect((‘127.0.0.1‘,8080)) # while True: # sk.send(b‘hi‘) # ret = sk.recv(1024).decode(‘utf-8‘) # print(ret) # time.sleep(1) # sk.close() # for i in range(500): # threading.Thread(target=my_client).start() #非阻塞IO #非阻塞IO # import time # import socket # import threading # def func(): # sk = socket.socket() # sk.connect((‘127.0.0.1‘,8080)) # time.sleep(1) # sk.send(b‘hi‘) # print(sk.recv(10)) # sk.close() # # for i in range(10): # threading.Thread(target=func,).start() #IO多路复用 # import time # import socket # import threading # def client_async(args): # sk = socket.socket() # sk.connect((‘127.0.0.1‘,8099)) # for i in range(10): # time.sleep(2) # sk.send((‘%s[%s] :hello‘%(args,i)).encode(‘utf-8‘)) # print(sk.recv(1024)) # sk.close() # # for i in range(10): # threading.Thread(target=client_async,args=(‘*‘*i,)).start() # selector_demo #服务端 # from socket import * # import selectors # # sel=selectors.DefaultSelector() # 创建一个默认的多路复用模型 # def accept(sk): # conn,addr=sk.accept() # sel.register(conn,selectors.EVENT_READ,read) # # def read(conn): # try: # data=conn.recv(1024) # if not data: #win8 win10 # print(‘closing‘,conn) # sel.unregister(conn) # conn.close() # return # conn.send(data.upper()+b‘_SB‘) # except Exception: # linux操作系统 # print(‘closing‘, conn) # sel.unregister(conn) # conn.close() # # sk=socket(AF_INET,SOCK_STREAM) # sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) # sk.bind((‘127.0.0.1‘,8088)) # sk.listen(5) # sk.setblocking(False) #设置socket的接口为非阻塞 # sel.register(sk,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept # # while True: # events=sel.select() #检测所有的fileobj,是否有完成wait data的 #[sk,conn] # for sel_obj,mask in events: # 有人触动了你在sel当中注册的对象 # callback=sel_obj.data #callback=accpet # sel_obj.data就能拿到当初注册的时候写的accept/read方法 # callback(sel_obj.fileobj) #accpet(sk)/read(conn)
操作系统的发展 # 没有操作系统 # 批处理系统 # 多道程序系统 # 分时系统 # 实时系统 # 通用操作系统 # 操作系统的功能 # 进程 # 进程是操作系统中资源分配的最小单位 # 进程调度算法 # 先来先服务算法 FCFS # 短作业优先算法 # 时间片轮转算法 # 多级反馈算法 # 异步 # 同步 # 阻塞 # 非阻塞 # 进程的状态 :就绪 运行 阻塞 # 进程的创建与结束 #父进程 子进程 start开启一个进程 join 让主进程等待子进程结束 事件 异步阻塞 # 事件 标志 同时 是所有的进程都陷入阻塞 # 通过队列实现了 主进程与子进程的通信 子进程与子进程之间的通信 # 协程 # IO模型 # Lock锁 —— 降低了程序的执行效率 但是保证了数据的安全性 # 信号量 —— 多把钥匙公用一把锁 sem # 进程包含着线程 # GIL 全局解释器锁 Cpython解释器 # 去掉GIL 保证数据安全 细粒度的锁 效率更低 # 高计算 多进程 # 高IO 多线程 —— 爬虫 网络 # 线程 更加的轻量级 # 线程是CPU调度的最小单位 # 进程是资源分配的最小单位 # 开启线程的时空开销 都比 开启进程要小 # 且 cpu在线程之间切换 比 在进程之间切换快 # 一个程序中 可以同时有多进程和线程 setblocking = False recv() 同步 干完一件事 再干一件事儿 异步 同时处理多个任务 recv 数据 阻塞IO : 工作效率低 非阻塞IO : 工作效率高,CPU的负担 IO多路复用: 在有多个对象需要IO阻塞的时候,能够有效的减少阻塞带来的时间损耗, 且能够在一定程度上减少CPU的负担 异步IO : asyncio 异步IO 工作效率高 CPU的负担少
以上是关于并发编程的主要内容,如果未能解决你的问题,请参考以下文章