python2.0 s12 day8 _ python线程
Posted zhming
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python2.0 s12 day8 _ python线程相关的知识,希望对你有一定的参考价值。
1.进程、与线程区别
2.cpu运行原理
3.python GIL全局解释器锁
4.线程
1.语法
2.join
3.线程锁之Lock\Rlock\信号量
4.将线程变为守护进程
5.Event事件
6.queue队列
7.生产者消费者模型
8.Queue队列
9.开发一个线程池
5进程
1.语法
2.进程间通讯
3.进程池
进程与线程
什么是线程(thread)?
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务
什么是进程(process)?
进程与线程的区别?
CPython中全局解释性锁是什么鬼?Python GIL(Global Interpreter Lock)
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
上面的核心意思就是,无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行,擦。。。,那这还叫什么多线程呀?莫如此早的下结结论,听我现场讲。
首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL
4.线程
Python threading模块 (线程模块)
1.语法
线程有2种调用方式,
方式1:直接调用. (用这种的多,因为简单)
方式2:定义一个继承线程模块的类,然后在类中写一个run方法. (用的不多,但是要知道,以后编程不仅要看自己的代码还要看别人的代码)
方式1代码举例:
1 #/usr/bin/env python3.5 2 #-*-encoding:utf8-*- 3 #__authro__:‘ted.zhou‘ 4 ‘‘‘ 5 多线程之直接调用方式 6 ‘‘‘ 7 8 import threading 9 import time 10 11 def sayhi(num): # 定义每一个线程要执行的函数 12 print(‘running thread number:%s‘%num) 13 time.sleep(3) # 这里睡3秒钟,是为了让大家看清楚python解释器解释多线程时,这个来回切换而让我们觉得线程是并行的.因为2个或更多,等待最长3秒,不是累计等待 14 15 if __name__ == ‘__main__‘: 16 t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例 17 t2 = threading.Thread(target=sayhi,args=(2,)) #又生成了一个线程实例 18 19 t1.start() 20 t2.start() 21 22 print(t1.getName()) 23 print(t2.getName())
方式2代码举例:
#!/usr/bin/env python3.5 #__author__:‘ted.zhou‘ import threading import time class mythread(threading.Thread): ‘‘‘ 定义一个多线程类,类中必须包含run方法,run方法就是实例化此类start()方法内部调用的方法 ‘‘‘ def __init__(self,num): ‘‘‘ 定义构造方法,采用新式类继承therading.Thread类 :param num: :return: ‘‘‘ threading.Thread.__init__(self) # 经典类继续方法 # super(mythread,self).__init__(self) # 新式类继承方法执行报错,不知道为什么 self.num=num def run(self): print(‘now run thread number:%s‘%self.num) time.sleep(3) if __name__ == ‘__main__‘: t1 = mythread(1) t2 = mythread(2) t1.start() t2.start() print(‘主线程执行完成!‘)
2.join
我们知道一个进程中至少有一个线程,子线程都是由这个主线程触发的.当自线程一旦启动,它和主线程并行.
所以python解释器在解释子线程的同时也在解释主线程,所以不会像我们想象那样(主线程会挂起,等带所有的子线程都解释执行完成才继续执行.)
但是实际使用中我们又有需求,要求子线程执行完后在执行主线程.那么怎么实现的.这时就需要用到threading.join()方法.具体实现代码,我们用
多线程模块调用 方式1 举例:
1 #!/usr/bin/env python3.5 2 #__author__:"ted.zhou" 3 ‘‘‘ 4 多线程的join方法举例 5 ‘‘‘ 6 import threading 7 import time 8 9 def sayhi(num): 10 print(‘now running thread number:%s‘%num) 11 time.sleep(3) 12 print(‘子线程%s执行结束‘%num) 13 14 if __name__ == ‘__main__‘: 15 t1 = threading.Thread(target=sayhi,args=[1,]) 16 t2 = threading.Thread(target=sayhi,args=[2,]) 17 18 t1.start() 19 t2.start() 20 # t1.join() # 先注释join方法 21 # t2.join() # 先注释join方法 22 print(‘主程序执行完成‘) 23 24 执行结果 25 now running thread number:1 26 now running thread number:2 27 主程序执行完成 28 子线程2执行结束 29 子线程1执行结束 30 31 Process finished with exit code 0
我们看到上面是,主线程结束后,子线程还在跑.当我们把join方法注释去掉,执行结果如下:
1 now running thread number:1 2 now running thread number:2 3 子线程1执行结束 4 子线程2执行结束 5 主程序执行完成
我们看看到,主线程一直在线程1 和线程2执行完成后,才继续往下走.
这里我们看,有两个子进程 我们使用了t1.join()和t2.join(),我们会想是不是每一个进程都需要加一个join方法.其实不是的
join方法一旦在代码中出现,它只会阻塞主线程,直至调用join方法的子线程完成才能继续.而子线程不受join方法的影响,继续执行.
那么我们会说我们调用一个join方法不就行了吗.理论上线程2比线程1晚运行,我们这里只需要执行t2.join()方法,理论上t1要比t2线程先完成.但是python解释器内部切换不是按你启动顺序,而是随机的.
所以为了保险起见,在处理多线程时,我们应该为每一个线程调用join()方法.
假如启动了100个线程,代码如下:
1 #!/usr/bin/env python3.5 2 #__author__:"ted.zhou" 3 ‘‘‘ 4 多线程的join方法举例 5 ‘‘‘ 6 import threading 7 import time 8 9 def sayhi(num): 10 print(‘now running thread number:%s‘%num) 11 time.sleep(3) 12 print(‘子线程%s执行结束‘%num) 13 14 if __name__ == ‘__main__‘: 15 thread_list = [] 16 for i in range(100): 17 t = threading.Thread(target=sayhi,args=[i,]) # 实例化子线程 18 thread_list.append(t) # 将子线程实例加入列表 19 t.start() 20 for t in thread_list: # 对线程实例列表循环 21 t.join() # 每一个都调用join()方法 22 print(‘主程序执行完成‘)
3.线程锁之Lock\Rlock\信号量
首先我们要知道为什么要加线程锁?
Rlock是多线程锁的情况存在时,必须将lock换成Rlock,所以有同学说无论是不是多层锁,我们都用Rlock,这样就一劳永逸了.此说法成立.
紧接着我们会问那为什么lock锁还存在呢?用老师的说法(你学python为什么先学面向过程编程,再学面向对象过程?)这有一个由浅入深的概念.
信号量: 线程锁 lock\Rlock 实现了,被锁定的代码串行运行,那么不是所有情况都是1个1个来,有时候也可以3个一起运行.所以线程中出现了信号量概念,让多少个线程同时运行.
3.1 线程锁(互斥锁)
首先我们先了解为什么要加线程锁?我们先看一个例子:
#!/usr/bin/env python3.5 #__author__:‘ted.zhou‘ ‘‘‘ 多个线程访问一个全局变量时的例子 ‘‘‘ import threading import time A = 10 def subtraction(threadId): global A print("线程%s得到的值:%s"%(threadId,A)) time.sleep(1) # 这里睡1秒钟的目的是为了让python解释器切换GIL全局解释性锁.不然这执行速度,会导致你没切换呢,线程1都已经执行完了. A -= 1 print("线程%s执行后变量A的结果%s"%(threadId,A)) if __name__ == ‘__main__‘: thread_list = [] for i in range(1,4): t = threading.Thread(target=subtraction,args=[i,]) thread_list.append(t) t.start() for t in thread_list: t.join() print("变量A最后获得的结果:%s"%A) 执行的结果: 线程1得到的值:10 线程2得到的值:10 线程3得到的值:10 线程2执行后变量A的结果9 线程1执行后变量A的结果8 线程3执行后变量A的结果7 变量A最后获得的结果:7
假如循环100次,得到的结果可能就不一样了.那么我们有两个疑问:
对多线程锁节的疑问:
1.为什么得到一样的A =10 ,3个线程做 A -=1操作后的结果会累加 最终打印A的值是7?
2.既然有了GIL全局解释性锁,为啥还要用 线程锁?
那么我们来分析执行过程:
a.创建3个线程,就是拷贝了3次A =10 的变量。因为多线程 共享进程的内存,所以我觉得这里不是拷贝,这里使用的就是进程里的A=10的全局变量。3个线程对这一个全局变量进行-1操作,结果不就是7。
b.我们知道python解释器在解释代码时,读入内存-》词法、语法分析 -》编程成字节码 -》机器码 -> cpu执行
所以上述代码首先都会编译成字节码。python解释器在处理多线程任务时,其实就是内部有一个切换机制,不停的在多个线程之间切换,被切换到的就把全局解释器锁加在这个线程上,并且调用C语言模块的线程接口对这个线程上的一段字节码转换成机器码,然后再执行。
所以我们在第一行代码print(A)时,打印的值都是一样。紧接着,当执行到A -= 1这句代码时,这句代码其实在经过python解释器编译成字节码时,它可就不是一句字节码了,可能有多字节码(这多句字节码可能实现了两个功能1.把 A这个变量做了减1操作 2.把减1后的结果返回给 A )。
python解释器在内部切换时,可能就在处理一个线程的-1操作时,就切换到其他线程上去了。从而导致,本来处理 -1 并赋值给变量A的操作,只做了一半,就切换到其他进程上了。
那么就会出现当全局解释器锁被切换到其他线程上时,其他线程用变量A(这时还是10)再去做-1操作,巧合的是,它做完-1操作后,这时切换的时间还没到,所以紧接着这个进程下又做了赋值操作。
那么当再次切换到前面那个未做赋值操作的线程时,它接着之前的字节码继续执行,就会再次对变量A再次赋值。那么问题来了,两次赋值一样,那得到的结果肯定就会冲掉一个。那么我们当然想 把 -1操作 和给变量A赋值 操作一次执行了。
那么怎么才能实现呢。 这时就该线程锁出场了, 在 A -=1 前加一个锁,在A -=1 后 释放锁,而python解释器一旦看到这个锁,解释器切换机制就明白了,下面不能随便切换了,要等看到这个线程在下面的代码中出现释放锁的标记才进行正常切换。从而保证了多线程对全局变量操作的结果不混乱。
上面的解释应该能明白为什么需要加线程锁了,知道了为什么要加,才能在实际编程使用自如.代码如下:
1 #!/usr/bin/env python3.5 2 #__author__:‘ted.zhou‘ 3 ‘‘‘ 4 多个线程访问一个全局变量时的例子 5 ‘‘‘ 6 import threading 7 import time 8 9 10 11 def subtraction(threadId): 12 global A 13 print("线程%s得到的值:%s"%(threadId,A)) 14 time.sleep(1) # 这里睡1秒钟的目的是为了让python解释器切换GIL全局解释性锁.不然这执行速度,会导致你没切换呢,线程1都已经执行完了. 15 lock.acquire() #修改数据前加锁 16 A -= 1 17 print("线程%s执行后变量A的结果%s"%(threadId,A)) 18 lock.release() #修改后释放 19 20 21 22 if __name__ == ‘__main__‘: 23 A = 100 #设置一个全局变量 24 lock = threading.Lock() #生成全局锁 25 thread_list = [] 26 for i in range(1,101): 27 t = threading.Thread(target=subtraction,args=[i,]) 28 thread_list.append(t) 29 t.start() 30 31 for t in thread_list: 32 t.join() 33 34 print("变量A最后获得的结果:%s"%A)
上面加锁后就不用担心切换了.
3.2 递归锁
紧接着我们来想下,如果有那么一个需求,一个子线程中调用另外两个子线程.要求两个子线程运行完成后,才继续执行.这个和mysql事务性有点像.处理完查询,在处理新添加.整个完成才能进行其他操作.
实例代码如下:
1 import threading,time 2 3 def run1(): 4 print("grab the first part data") 5 lock.acquire() 6 global num 7 num +=1 8 lock.release() 9 return num 10 def run2(): 11 print("grab the second part data") 12 lock.acquire() 13 global num2 14 num2+=1 15 lock.release() 16 return num2 17 def run3(): 18 lock.acquire() 19 res = run1() 20 print(‘--------between run1 and run2-----‘) 21 res2 = run2() 22 lock.release() 23 print(res,res2) 24 25 26 if __name__ == ‘__main__‘: 27 28 num,num2 = 0,0 29 lock = threading.RLock() 30 for i in range(10): 31 t = threading.Thread(target=run3) 32 t.start() 33 34 while threading.active_count() != 1: 35 print(threading.active_count()) 36 else: 37 print(‘----all threads done---‘) 38 print(num,num2)
所以,建议所有的锁,咱都用递归锁,不然加多层锁后会无线循环下去,因为python解释器会不知道锁的位置.
3.3 Semaphore(信号量)
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
代码如下:
1 import threading,time 2 3 def run(n): 4 semaphore.acquire() 5 time.sleep(1) 6 print("run the thread: %s\n" %n) 7 semaphore.release() 8 9 if __name__ == ‘__main__‘: 10 11 num= 0 12 semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行 13 for i in range(20): 14 t = threading.Thread(target=run,args=(i,)) 15 t.start() 16 17 while threading.active_count() != 1: 18 pass #print threading.active_count() 19 else: 20 print(‘----all threads done---‘) 21 print(num)
执行结果就是:5个一起并行,紧接着在5个
4.将线程变为守护进程
Daemon 守护进程的作用:守护程序后台运行,如果守护程序退出了,前端的程序也就跟着死掉了.
我们在join()知识点里知道,正常生成线程的函数或者代码执行在不用join的方法前提下,只是主线程和子线程同步,没有实现子线程执行完成后在执行主线程,加了join方法后,就实现了主线程会等待子线程执行完后记性执行主线程.
那现在我们的守护进程,想实现,主线程只要关闭了,子线程也跟着关闭,不管子线程是否执行完成.
什么情况下会用到呢,真实场景是什么呢:
比如我想写一个批量登录 100台主机,执行一个命令.但是呢会有3~5台ssh连接特别慢,超时2分钟.这时候我为了执行效率,代码设定超过30秒程序就退出了.
代码为例:
1 #!/usr/bin/env python3.5 2 #__author__:‘ted.zhou‘ 3 ‘‘‘ 4 守护进程实例 5 守护进程作用是,守护进程后台运行,当守护进程死掉,后台的程序也都会死掉 6 ‘‘‘ 7 8 import threading 9 import time 10 11 def run(n): 12 print(‘进程--%s----running-----‘%n) 13 time.sleep(2) 14 print(‘进程--%s-----done-------‘%n) 15 16 def main(): 17 ‘‘‘ 18 在main中启动多个线程调用run()方法 19 :return: 20 ‘‘‘ 21 for i in range(1,6): 22 t = threading.Thread(target=run,args=[i,]) 23 t.start() 24 print(‘进程--%s----start----- ‘%(t.getName())) 25 print(‘-----守护进程执行结束 -------‘) 26 27 m = threading.Thread(target=main,args=[]) 28 m.setDaemon(True) # 把这个m线程设置成守护线程 29 m.start() 30 #m.join(timeout=2) # 这里先注释,不使用join()方法的前提下,线程都不运行完就结束了 31 print("-----main thread done-----") 32 33 执行结果 34 -----main thread done----- 35 进程--1----running----- 36 37 Process finished with exit code 0
上面的结果虽然实现了主进程一旦关闭,子进程也不运行了.
但是设置了m.setDaemon(True)后,程序一下子就执行完了,不说那5个调用run函数的线程了,连main线程也没执行.
这就是m.setDaemon(True)后的作用.那么我们肯定是希望main能运行的.所以就需要把#m.join(timeout=2) 注释去掉.
去掉后的执行结果是:
1 进程--1----running----- 2 进程--Thread-2----start----- 3 进程--2----running----- 4 进程--Thread-3----start----- 5 进程--3----running----- 6 进程--Thread-4----start----- 7 进程--4----running----- 8 进程--Thread-5----start----- 9 进程--5----running----- 10 进程--Thread-6----start----- 11 -----守护进程执行结束 ------- 12 -----main thread done-----
这个结果我们看到,调用main()函数的线程执行完了.但调用run()函数的线程为什么没执行完成.我m.join(timeout=2)不是设置了2秒的等待吗?
错:设置m.join(timeout=2)方法,是告诉启用m线程的主线程可以等待m线程实例2分钟,但是实际m线程调用的main()函数根本不需要2秒钟就执行完了.
所以不到两秒的时间,主程序又开始往下走.那么我们如何实现(比如我想写一个批量登录 100台主机,执行一个命令.但是呢会有3~5台ssh连接特别慢,超时2分钟.这时候我为了执行效率,代码设定超过30秒程序就退出了.)
等待30秒的实例呢.
我们想既然m.join(timeout=2)这里m线程不需要2秒,他就执行完成了.那我把main()方法执行的时间调整到需要两秒不就OK了.
所以代码写成如下:
1 #!/usr/bin/env python3.5 2 #__author__:‘ted.zhou‘ 3 ‘‘‘ 4 守护进程实例 5 守护进程作用是,守护进程后台运行,当守护进程死掉,后台的程序也都会死掉 6 ‘‘‘ 7 8 import threading 9 import time 10 11 def run(n): 12 print(‘进程--%s----running-----‘%n) 13 time.sleep(2) 14 print(‘进程--%s-----done-------‘%n) 15 16 def main(): 17 ‘‘‘ 18 在main中启动多个线程调用run()方法 19 :return: 20 ‘‘‘ 21 for i in range(1,6): 22 t = threading.Thread(target=run,args=[i,]) 23 t.start() 24 print(‘进程--%s----start----- ‘%(t.getName())) 25 time.sleep(2) # 这里设置main函数执行也需要2秒 26 print(‘-----守护进程执行结束 -------‘) 27 28 m = threading.Thread(target=main,args=[]) 29 m.setDaemon(True) 30 m.start() 31 m.join(timeout=2) # 那么这里的等待2秒就有意义了. 32 print("-----main thread done-----") 33 代码执行结果: 34 进程--1----running----- 35 进程--Thread-2----start----- 36 进程--2----running----- 37 进程--Thread-3----start----- 38 进程--3----running----- 39 进程--Thread-4----start----- 40 进程--4----running----- 41 进程--Thread-5----start----- 42 进程--5----running----- 43 进程--Thread-6----start----- 44 -----main thread done----- 45 进程--1-----done------- 46 进程--3-----done------- 47 进程--5-----done------- 48 -----守护进程执行结束 ------- 49 进程--4-----done------- 50 51 Process finished with exit code 0
这样就能满足,在30秒内执行的线程都执行完成,过了30秒还没执行完成的也不等待了.直接放弃.
守护线程完
5.Event事件
通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。
代码如下:
1 import threading,time 2 import random 3 def light(): 4 if not event.isSet(): 5 event.set() #wait就不阻塞 #绿灯状态 6 count = 0 7 while True: 8 if count < 10: 9 print(‘\033[42;1m--green light on---\033[0m‘) 10 elif count <13: 11 print(‘\033[43;1m--yellow light on---\033[0m‘) 12 elif count <20: 13 if event.isSet(): 14 event.clear() 15 print(‘\033[41;1m--red light on---\033[0m‘) 16 else: 17 count = 0 18 event.set() #打开绿灯 19 time.sleep(1) 20 count +=1 21 def car(n): 22 while 1: 23 #time.sleep(random.randrange(10)) 24 time.sleep(1) 25 if event.isSet(): #绿灯 26 print("car [%s] is running.." % n) 27 else: 28 print("car [%s] is waiting for the red light.." %n) 29 event.wait() 30 if __name__ == ‘__main__‘: 31 event = threading.Event() 32 Light = threading.Thread(target=light) 33 Light.start() 34 for i in range(3): 35 t = threading.Thread(target=car,args=(i,)) 36 t.start()
以上是关于python2.0 s12 day8 _ python线程的主要内容,如果未能解决你的问题,请参考以下文章
python2.0_s12_day9之day8遗留知识(queue队列&生产者消费者模型)