Python基础(十四)-并发编程
Posted hujinzhong
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python基础(十四)-并发编程相关的知识,希望对你有一定的参考价值。
一、操作系统
参考文档:https://www.cnblogs.com/yuanchenqi/articles/6248025.html
二、进程与线程
2.1、进程简介
进程:一个程序在一个数据集上的一次动态执行过程,一般由程序、数据集、进程控制块三部分组成
- 程序:用来描述进程要完成哪些功能以及如何完成
- 数据集:程序在执行过程中所需要使用的资源
- 进程控制块:记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系 统感知进程存在的唯一标志
2.2、线程
线程:轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序 计数器、寄存器集合和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发 性能。线程没有自己的系统资源。
2.3、进程与线程区别
1)一个程序至少有一个进程,一个进程至少有一个线程.(进程可以理解成线程的容器)
2)进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。
3)线程在执行过程中与进程还是有区别的。每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。
4)进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位.
5)线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈)但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源.
6)一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行.
三、python GIL
In CPython, the global interpreter lock(全局解释器锁), or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
核心意思就是,无论你启多少个线程,你有多少个cpu, Python在执行的时候会在同一时刻只允许一个线程运行
四、python线程与threading模块
4.1、线程调用方式
1)直接调用方式
import threading import time def func(num): #定义每个线程运行的函数 print("running on num: %s" %num) time.sleep(3) if __name__ == ‘__main__‘: t1=threading.Thread(target=func,args=(1,)) #生成一个线程实例,注意target=只需要写函数名即可 t2=threading.Thread(target=func,args=(2,)) t1.start() #启动线程 t2.start() print(t1.getName()) #获取线程名称 ==>Thread-1 print(t2.getName()) #获取线程名称 ==>Thread-2
2)继承式调用
import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) #父类初始化 self.num=num def run(self): #定义每个线程要运行的函数,名称必须是run print("running on num: %s" %self.num) time.sleep(3) if __name__ == ‘__main__‘: t1=MyThread(1) t2=MyThread(2) t1.start() t2.start() print("ending..........")
4.2、threading.Thread实例方法
1)join():在子线程完成运行之前,这个子线程的父线程将一直被阻塞
2)setDaemon(True):将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就分兵两路分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法
import threading from time import ctime,sleep import time def ListenMusic(name): print ("Begin listening to %s. %s" %(name,ctime())) sleep(3) print("end listening %s"%ctime()) def RecordBlog(title): print ("Begin recording the %s! %s" %(title,ctime())) sleep(5) print(‘end recording %s‘%ctime()) threads = [] #定义线程列表 t1 = threading.Thread(target=ListenMusic,args=(‘AA‘,)) t2 = threading.Thread(target=RecordBlog,args=(‘BB‘,)) threads.append(t1) threads.append(t2) if __name__ == ‘__main__‘: for t in threads: t.setDaemon(True) #注意:一定在start之前设置 t.start() # t.join() t1.join() # t1.setDaemon(True) print ("all over %s" %ctime())
3)其他方法
run(): #线程被cpu调度后自动执行线程对象的run方法 start(): #启动线程活动。 isAlive(): #返回线程是否活动的。 getName(): #返回线程名。 setName(): #设置线程名。 #threading模块提供的一些方法: threading.currentThread(): #返回当前的线程变量。 threading.enumerate(): #返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 threading.activeCount(): #返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
五、同步锁
5.1、问题引出
import time import threading def addNum(): global num #在每个线程中都获取这个全局变量 # num-=1 #final num: 0 temp=num #print(‘--get num:‘,num ) time.sleep(0.01) #相当于io操作,存在线程切换 num =temp-1 #对此公共变量进行-1操作 num = 100 #设定一个共享变量 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有线程执行完毕 t.join() print(‘final num:‘, num ) #结果 #当time.sleep(0.1) ==>99 #当time.sleep(0.01) ==>90多随机出现 #原因:当第一个线程拿到num=100时,time.sleep时,线程切换,第二个线程拿到还是100,他不知道第一个线程拿到的num是几 #同理当第二个线程time.sleep,线程切换,第三个拿到还是100
5.2、问题解决
import time import threading def addNum(): global num #在每个线程中都获取这个全局变量 # num-=1 #final num: 0 R.acquire() #获得同步锁,此时第二个线程不能执行 temp=num time.sleep(0.01) #相当于io操作,存在线程切换 num =temp-1 #对此公共变量进行-1操作 R.release() #释放锁,此时第二个线程可以执行了 num = 100 #设定一个共享变量 thread_list = [] R=threading.Lock() #定义同步锁 for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有线程执行完毕 t.join() print(‘final num:‘, num ) #0
六、线程死锁
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去
import threading,time class myThread(threading.Thread): def doA(self): lockA.acquire() #获得锁A print(self.name,"gotlockA",time.ctime()) time.sleep(3) lockB.acquire() print(self.name,"gotlockB",time.ctime()) lockB.release() lockA.release() #释放锁A def doB(self): lockB.acquire() print(self.name,"gotlockB",time.ctime()) time.sleep(2) lockA.acquire() print(self.name,"gotlockA",time.ctime()) #第一个线程得到了锁B,此时需要获取锁A,而此时锁A已经被第二个线程获取了,造成了死锁 lockA.release() lockB.release() def run(self): self.doA() self.doB() if __name__=="__main__": lockA=threading.Lock() lockB=threading.Lock() threads=[] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join() #结果 Thread-1 gotlockA Fri Sep 13 21:19:45 2019 Thread-1 gotlockB Fri Sep 13 21:19:48 2019 Thread-1 gotlockB Fri Sep 13 21:19:48 2019 Thread-2 gotlockA Fri Sep 13 21:19:48 2019
七、递归锁
可以使用递归锁来解决死锁问题:lock=threading.RLock()
RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
import threading,time class myThread(threading.Thread): def doA(self): lock.acquire() #获得递归锁counter=1 print(self.name,"gotlock",time.ctime()) time.sleep(3) lock.acquire() #获得递归锁counter=2 print(self.name,"gotlock",time.ctime()) lock.release() #释放递归锁counter=1 lock.release() #释放递归锁counter=0,此时第二个线程可以抢夺锁了 def doB(self): lock.acquire() print(self.name,"gotlock",time.ctime()) time.sleep(2) lock.acquire() print(self.name,"gotlock",time.ctime()) lock.release() lock.release() def run(self): self.doA() # time.sleep(0.1) self.doB() if __name__=="__main__": # lockA=threading.Lock() # lockB=threading.Lock() lock=threading.RLock() threads=[] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join() #结果 Thread-1 gotlock Fri Sep 13 21:33:55 2019 Thread-1 gotlock Fri Sep 13 21:33:58 2019 Thread-1 gotlock Fri Sep 13 21:33:58 2019 Thread-1 gotlock Fri Sep 13 21:34:00 2019 Thread-3 gotlock Fri Sep 13 21:34:00 2019 Thread-3 gotlock Fri Sep 13 21:34:03 2019 Thread-3 gotlock Fri Sep 13 21:34:03 2019 Thread-3 gotlock Fri Sep 13 21:34:05 2019 Thread-5 gotlock Fri Sep 13 21:34:05 2019 Thread-5 gotlock Fri Sep 13 21:34:08 2019 Thread-5 gotlock Fri Sep 13 21:34:08 2019 Thread-5 gotlock Fri Sep 13 21:34:10 2019 Thread-4 gotlock Fri Sep 13 21:34:10 2019 Thread-4 gotlock Fri Sep 13 21:34:13 2019 Thread-4 gotlock Fri Sep 13 21:34:13 2019 Thread-4 gotlock Fri Sep 13 21:34:15 2019 Thread-2 gotlock Fri Sep 13 21:34:15 2019 Thread-2 gotlock Fri Sep 13 21:34:18 2019 Thread-2 gotlock Fri Sep 13 21:34:18 2019 Thread-2 gotlock Fri Sep 13 21:34:20 2019
八、同步条件(Event)
8.1、event方法
event=threading.Event() #创建event对象 event.wait() #线程需等待flag被设置 event.set() #设置flag event.clear() #清除flag An event is a simple synchronization object;the event represents an internal flag,and threads can wait for the flag to be set, or set or clear the flag themselves. If the flag is set, the wait method doesn’t do anything. #一旦flag被设置,wait方法不会做任何事,相当于pass If the flag is cleared, wait will block until it becomes set again. #一旦flag被清除,wait方法会被阻塞,知道flag被设置 #Any number of threads may wait for the same event. #任何数量的线程会等待相同的event
8.2、示例
import threading,time class Boss(threading.Thread): def run(self): print("BOSS:今晚大家都要加班到22:00。") print(event.isSet()) event.set() #设置flag,一旦设置,wait()阻塞解除 time.sleep(5) print("BOSS:<22:00>可以下班了。") print(event.isSet()) event.set() class Worker(threading.Thread): def run(self): event.wait() #此时flag还未被设置,被阻塞 print("Worker:哎……命苦啊!") time.sleep(1) event.clear() #清除flag event.wait() print("Worker:OhYeah!") if __name__=="__main__": event=threading.Event() #定义event对象 threads=[] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join() #++++++++++++++++++++++++++++++++++++++++++++++++++ BOSS:今晚大家都要加班到22:00。 False Worker:哎……命苦啊! Worker:哎……命苦啊! Worker:哎……命苦啊! Worker:哎……命苦啊! Worker:哎……命苦啊! BOSS:<22:00>可以下班了。 False Worker:OhYeah! Worker:OhYeah! Worker:OhYeah! Worker:OhYeah! Worker:OhYeah!
九、信号量(Semaphore)
1)信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。
2)计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)
3)BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。
import threading,time class myThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) time.sleep(5) semaphore.release() if __name__=="__main__": semaphore=threading.Semaphore(5) #相当于并发量为5 thrs=[] for i in range(100): thrs.append(myThread()) for t in thrs: t.start()
十、多线程利器-队列
10.1、列表:不安全数据结构
import threading,time li=[1,2,3,4,5] def pri(): while li: a=li[-1] #获取列表最后一个元素 print(a) time.sleep(1) try: li.remove(a) except Exception as e: print(‘----‘,a,e) t1=threading.Thread(target=pri,args=()) t1.start() t2=threading.Thread(target=pri,args=()) t2.start() #结果 5 5 4 ---- 5 list.remove(x): x not in list 4 ...
以上是关于Python基础(十四)-并发编程的主要内容,如果未能解决你的问题,请参考以下文章