并发扩展
Posted guyanzhi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发扩展相关的知识,希望对你有一定的参考价值。
同步异步
对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
1. 等待数据准备 (Waiting for the data to be ready)
2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)
同步:当进程执行IO(等待外部数据)的时候,-----等。同步(例如打电话的时候必须等)
异步:当进程执行IO(等待外部数据)的时候,-----不等,去执行其他任务,一直等到数据接收成功,再回来处理。异步(例如发短信)
当我们去爬取一个网页的时候,要爬取多个网站,有些人可能会发起多个请求,然后通过函数顺序调用。执行顺序也是先调用先执行。效率非常低。
同步还是异步,这需要我们根据需求来决定
lock、GIL 就是为了达到同步,来保证安全.
而异步在多进程/线程IO的时候能提高效率.
进程池、线程池
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:
- 很明显需要并发执行的任务通常要远大于核数
- 一个操作系统不可能无限开启进程,通常有几个核就开几个进程
- 进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)
例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
我们就可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数...
ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。
创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程
实例:
![技术分享图片](/img/jia.gif)
from multiprocessing import Pool # 进程池类 import os,time def work(n): print(‘%s run‘ %os.getpid()) time.sleep(3) return n**2 if __name__ == ‘__main__‘: p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply(work,args=(i,)) #同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞,但不管该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程中若是任务发生了阻塞就会被夺走cpu的执行权限 res_l.append(res) print(res_l)
![技术分享图片](/img/jia.gif)
from multiprocessing import Pool import os,time def work(n): print(‘%s run‘ %os.getpid()) time.sleep(3) return n**2 if __name__ == ‘__main__‘: p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) #同步运行,阻塞、直到本次任务执行完毕拿到res res_l.append(res) #异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了 p.close() p.join() for res in res_l: print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
死锁与递归锁(可重入锁)
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都
正在使用,所有这两个线程在无外力作用下将一直等待下去。更直观的死锁比如,有两个lock对象,同一资源分别被两个进程的lock.requare阻塞,就造成程序永久的阻塞
解决死锁就可以用递归锁
实例:
![技术分享图片](/img/jia.gif)
import threading,time # lock_A = threading.Lock() # lock_B = threading.Lock() r_lock = threading.RLock() class Mythread(threading.Thread): def actionA(self): r_lock.acquire() print(self.name,time.ctime()) time.sleep(2) r_lock.acquire() print(self.name,time.ctime()) time.sleep(1) r_lock.release() r_lock.release() def actionB(self): r_lock.acquire() print(self.name,time.ctime()) time.sleep(2) r_lock.acquire() print(self.name,time.ctime()) time.sleep(1) r_lock.release() r_lock.release() def run(self): self.actionA() self.actionB() li = [] for i in range(5): t = Mythread() t.start() li.append(t) for t in li: t.join() print("ending")
为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:
threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
信号量
BoundedSemaphore或Semaphore
实际上也是一种互斥(也就是锁),该锁用于限制线程的并发量
也就是说:信号量用来控制线程并发数的,
信号量管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。
计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)
BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。
![技术分享图片](/img/jia.gif)
import threading,time class myThread(threading.Thread): def run(self): #启动后,执行run方法 if semaphore.acquire(): #加把锁,可以放进去多个(相当于5把锁,5个钥匙,同时有5个线程) print(self.name) time.sleep(5) semaphore.release() if __name__=="__main__": semaphore=threading.Semaphore(5) #同时能有几个线程进去(设置为5就是一次5个线程进去),类似于停车厂一次能停几辆车 thrs=[] #空列表 for i in range(100): #100个线程 thrs.append(myThread()) #加线程对象 for t in thrs: t.start() #分别启动
事件(同步条件)
Event对象实现了简单的线程通信机制,它提供了设置信号,清除信号,等待等方法用于实现线程间的通信。
1 设置信号
使用Event的set()方法可以设置Event对象内部的信号标志为真。Event对象提供了isSet()方法来判断其内部信号标志的状态。当使用event对象的set()方法后,isSet()方法返回真
2 清除信号
使用Event对象的clear()方法可以清除Event对象内部的信号标志,即将其设为假,当使用Event的clear方法后,isSet()方法返回假
3 等待
Event对象wait的方法只有在内部信号为真的时候才会很快的执行并完成返回。当Event对象的内部信号标志位假时,则wait方法一直等待到其为真时才返回。
![技术分享图片](/img/jia.gif)
import threading, time class Boss(threading.Thread): def run(self): print("BOSS:今晚大家都要加班到22:00。") print(event.isSet()) event.set() time.sleep(5) print("BOSS:<22:00>可以下班了。") print(event.isSet()) event.set() class Worker(threading.Thread): def run(self): event.wait() print("Worker:哎……命苦啊!") time.sleep(1) event.clear() event.wait() print("Worker:OhYeah!") if __name__ == "__main__": event = threading.Event() threads = [] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join() 同步条件Event
![技术分享图片](/img/jia.gif)
import time from threading import Thread, Event e = Event() def start(): print("began start!") time.sleep(2) print("server start!") e.set() def conn(): while True: print("try connect...") e.wait(timeout=1) # 在这里阻塞 等1s 之后还没有isSet() 就退出 if e.isSet(): print("connect successful!") break else: print("connect filed!") Thread(target=start).start() Thread(target=conn).start()
生产者消费者模型
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
实例:
![技术分享图片](/img/jia.gif)
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print(‘