多线程
Posted juno3550
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程相关的知识,希望对你有一定的参考价值。
1. 进程 VS 线程
2. 创建多线程 - threading
3. 创建多线程 - Thread 子类
4. 同步
5. 死锁
6. 生产者与消费者模式
1. 进程VS线程
定义
- 进程是资源分配的最小单位,线程是CPU调度的最小单位。
- 一个程序启动后至少有一个进程,一个进程至少有一个线程。线程不能够独立执行,必须依存在进程中。
- 进程与线程均能够完成多任务。比如一台电脑上能够同时运行多个QQ(多进程);一个QQ中使用多个聊天窗口(多线程)。
优劣势
维度 | 多进程 | 多线程 | 优劣 |
---|---|---|---|
数据共享、同步 | 数据是分开的;共享复杂,需要用IPC;同步简单 | 多线程共享进程数据:共享简单;同步复杂 | 各有优势 |
内存、CPU | 占用内存多,切换复杂,CPU利用率低 | 占用内存少,切换简单,CPU利用率高 | 线程占优 |
创建销毁、切换 | 创建销毁、切换复杂,速度慢 | 创建销毁、切换简单,速度快 | 线程占优 |
编程调试 | 编程简单,调试简单 | 编程复杂,调试复杂 | 进程占优 |
可靠性 | 进程间不会相互影响 | 一个线程挂掉将导致整个进程挂掉 | 进程占优 |
分布式 | 适应于多核、多机分布 ;如果一台机器不够,扩展到多台机器比较简单 | 适应于多核分布 | 进程占优 |
消耗资源
从内核的观点看,进程的目的就是担当分配系统资源(CPU时间、内存等)的基本单位。线程是进程的一个执行流,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。
多线程彼此之间使用相同的地址空间,共享大部分数据,启动一个线程所花费的空间远远小于启动一个进程所花费的空间。而且,线程间彼此切换所需的时间也远远小于进程间切换所需要的时间。据统计,总的说来,一个进程的开销大约是一个线程开销的30倍左右,当然,在具体的系统上,这个数据可能会有较大的区别。
应用场景
1)需要频繁创建销毁的,优先用线程
这种原则最常见的应用就是Web服务器了,来一个连接建立一个线程,断了就销毁线程,要是用进程,创建和销毁的代价是很难承受的。
2)需要进行大量计算的,优先使用线程
所谓大量计算,当然就是要耗费很多CPU,切换频繁了,这种情况下线程是最合适的。
这种原则最常见的是图像处理、算法处理。
3)强相关的处理用线程,弱相关的处理用进程
什么叫强相关、弱相关?理论上很难定义,给个简单的例子就明白了。
一般的Server需要完成如下任务:消息收发、消息处理。“消息收发”和“消息处理”就是弱相关的任务,而“消息处理”里面可能又分为“消息解码”、“业务处理”,这两个任务相对来说相关性就要强多了。因此“消息收发”和“消息处理”可以分进程设计,“消息解码”、“业务处理”可以分线程设计。
当然这种划分方式不是一成不变的,也可以根据实际情况进行调整。
4)可能要扩展到多机分布的用进程,多核分布的用线程
5)都满足需求的情况下,用你最熟悉、最拿手的方式
至于“数据共享、同步”、“编程、调试”、“可靠性”这几个维度的所谓的“复杂、简单”应该怎么取舍,其实没有明确的选择方法。有一个选择原则:如果多进程和多线程都能够满足要求,那么选择你最熟悉、最拿手的那个。
需要提醒的是:虽然有这么多的选择原则,但实际应用中基本上都是“进程+线程”的结合方式,千万不要真的陷入一种非此即彼的误区。
多线程VS单线程
虽然单线程全部占有CPU,但不代表全部利用。而多线程能更好的利用资源,前提是组织好程序(比如需要执行多个不同的任务),否则并发执行的效率不一定比串行执行高,因为多线程在执行的时候会有个抢占CPU资源,上下文切换的过程。
如果你的程序仅仅是做一种简单的计算,其间不涉及任何可能是使线程挂起的操作,如I/O读写,等待某种事件等等。那么从表面上看,两个线程与单个线程相比,增加了切换的开销,应该比单线程慢才对。
但还得考虑操作系统的调度策略。通常,在支持线程的操作系统中,线程才是系统调度的单位,对同样一个进程来讲,多一个线程就可以多分到CPU时间,特别是从一个增加到两个的时候。
举例来说,假如在你的程序启动前,系统中已经有50个线程在运行,那么当你的程序启动后,假如他只有一个线程,那么平均来讲,它将获得1/51的CPU时间,而如果他有两个线程,那么就会获得2/52的CPU时间(当然,这是一种非常理想的情况,它没有考虑系统中原有其他线程的繁忙或者空闲程度,也没有考虑线程切换)。
但是如果你的程序里面已经有1000个线程,那么你把它加到1500,效果也不会有从1个线程加到2个线程来的明显。而且很可能造成系统的整体性能下降,因为线程之间的切换也需要时间。
2. 创建多线程 - threading
python的thread模块是比较底层的模块,而python的threading模块对thread做了一些包装,可以更加方便的被使用。
- 多线程并发的操作,花费时间要(比多进程)短很多。
- 每个线程一定会有一个名字,尽管没有指定线程对象的name,但是python会自动为线程指定一个名字。
- 主线程会等待所有的子线程结束后才结束。
- 当线程的run()方法结束时,该线程完成。
- 全局变量是多个线程都共享的数据,而局部变量等是各自线程的,是非共享的。
线程的几种状态:
示例1:
1 import threading 2 import time 3 4 5 def sing(): 6 for i in range(3): 7 print("sing:%d" % i) 8 time.sleep(1) 9 10 11 def dance(): 12 for i in range(3): 13 print("dance:%d" % i) 14 time.sleep(1) 15 16 17 if __name__ == "__main__": 18 print("---开始---") 19 t1 = threading.Thread(target=sing) 20 t2 = threading.Thread(target=dance) 21 t1.start() 22 t2.start() 23 print("---结束---")
执行结果:
---开始--- sing:0 dance:0 ---结束--- dance:1 sing:1 sing:2 dance:2
示例2:查看线程数量
1 import threading 2 import time 3 4 5 def sing(): 6 for i in range(3): 7 print("sing:%d" % i) 8 time.sleep(1) 9 10 11 def dance(): 12 for i in range(3): 13 print("dance:%d" % i) 14 time.sleep(1) 15 16 # 主线程+2个start()线程,共会执行3个线程 17 if __name__ == "__main__": 18 print("---开始---") 19 t1 = threading.Thread(target=sing) 20 t2 = threading.Thread(target=dance) 21 t1.start() 22 t2.start() 23 24 while True: 25 length = len(threading.enumerate()) 26 print("当前运行的线程数为:%s" % length) 27 if length <= 1: 28 break 29 time.sleep(1)
执行结果:
---开始--- sing:0 dance:0 当前运行的线程数为:3 sing:1 dance:1 当前运行的线程数为:3 当前运行的线程数为:3 dance:2 sing:2 当前运行的线程数为:2 当前运行的线程数为:1
3. 创建多线程 - Thread 子类
通过使用threading模块能完成多任务的程序开发。为了让每个线程的封装性更完美,在使用threading模块时,往往会定义一个新的子类(继承threading.Thread),
然后重写run
方法。
Thread类中的run方法,用于定义线程的功能函数,在创建自己了线程实例后,通过Thread类的start方法,可以启动该线程,交给python虚拟机进行调度,当该线程获得执行的机会时,就会调用run方法执行线程。
示例:
1 import threading 2 import time 3 4 5 class MyThread(threading.Thread): 6 7 def run(self): 8 for i in range(3): 9 time.sleep(1) 10 msg = "I‘m "+self.name+":"+str(i) # name属性保存的是当前线程的名称 11 print(msg) 12 13 14 if __name__ == "__main__": 15 t = MyThread() 16 t.start()
执行结果:
I‘m Thread-1:0 I‘m Thread-1:1 I‘m Thread-1:2
多线程的执行顺序
1 import threading 2 import time 3 4 5 class MyThread(threading.Thread): 6 7 def run(self): 8 for i in range(3): 9 time.sleep(1) 10 msg = "I‘m"+self.name+":"+str(i) # name属性保存的是当前线程的名称 11 print(msg) 12 13 14 if __name__ == "__main__": 15 for i in range(7): 16 t = MyThread() 17 t.start()
执行效果:
多线程程序的执行顺序是不确定的。当执行到sleep语句时,线程将被阻塞(Blocked);到sleep结束后,线程进入就绪(Runnable)状态,等待调度。而线程调度将自行选择一个线程执行。上面的代码中只能保证每个线程都运行完整个run函数,但是线程的启动顺序、run函数中每次循环的执行顺序都不能确定。
I‘mThread-1:0 I‘mThread-2:0 I‘mThread-3:0 I‘mThread-4:0 I‘mThread-6:0 I‘mThread-5:0 I‘mThread-7:0 I‘mThread-1:1 I‘mThread-2:1 I‘mThread-3:1 I‘mThread-4:1 I‘mThread-6:1 I‘mThread-5:1 I‘mThread-7:1 I‘mThread-1:2 I‘mThread-2:2 I‘mThread-3:2 I‘mThread-4:2 I‘mThread-6:2 I‘mThread-5:2 I‘mThread-7:2
4. 同步
多线程开发可能遇到的问题
假设两个线程t1和t2都要对num=0进行增1运算,t1和t2都各对num修改10次,num的最终的结果应该为20。
但是由于是多线程访问,有可能出现下面情况:
- 在num=0时,t1取得num=0。
- 此时系统把t1调度为”sleeping”状态,把t2转换为”running”状态,t2也获得num=0。于是t2对得到的值进行加1并赋给num,使得num=1。
- 接着系统又把t2调度为”sleeping”,把t1转为”running”。线程t1又把它之前得到的0加1后赋值给num。
- 于是,明明t1和t2都完成了1次加1工作,但结果仍然是num=1。
问题产生的原因就是没有控制多个线程对同一资源的访问,对数据造成破坏,使得线程运行的结果不可预期。这种现象称为“线程不安全”。
什么是同步
同步就是协同步调,按预定的先后次序进行运行。如:你说完,我再说。
"同"字从字面上容易理解为一起动作,其实不是,"同"字应是指协同、协助、互相配合。
如进程、线程同步,可理解为进程或线程A和B一块配合,A执行到一定程度时要依靠B的某个结果,于是停下来,示意B运行;B依言执行,再将结果给A;A再继续操作。
互斥锁
当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制。
线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。互斥锁为资源引入了一个状态:锁定/非锁定。
某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
threading模块中定义了Lock类,可以方便的处理锁定:
#创建锁 mutex = threading.Lock() #锁定 mutex.acquire([blocking]) #释放 mutex.release()
其中,锁定方法acquire可以有一个blocking参数:
- 如果设定blocking为True,则当前线程会堵塞,直到获取到这个锁为止(如果没有指定,那么默认为True)。
- 如果设定blocking为False,则当前线程不会堵塞。
示例:
1 from threading import Thread, Lock 2 3 g_num = 0 4 5 def test_1(): 6 global g_num 7 for i in range(100000): 8 # 默认True表示堵塞,即如果这个线程在上锁之前已经被另一方上锁了,那么这个线程会一直等待到解锁为止 9 # False表示非堵塞,即不管能否成功获得锁,都不会等待,直接跳过加锁块中的代码往下执行 10 if mutex.acquire(True): 11 g_num += 1 12 mutex.release() 13 print("---test1---g_num=%d" % g_num) 14 15 16 def test_2(): 17 global g_num 18 for i in range(100000): 19 if mutex.acquire(True): 20 g_num += 1 21 mutex.release() 22 print("---test2---g_num=%d" % g_num) 23 24 25 # 创建一个互斥锁,默认是未上锁状态 26 mutex = Lock() 27 p1 = Thread(target=test_1) 28 p1.start() 29 p2 = Thread(target=test_2) 30 p2.start() 31 32 print("final g_num = %d" % g_num) # 主线程
执行结果:
final g_num = 5334 ---test1---g_num=195684 ---test2---g_num=200000
上锁解锁过程
当一个线程调用锁的 acquire() 方法获得锁时,锁就进入“locked”状态。
每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“阻塞”,直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入“unlocked”状态。
线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。
等待解锁的方式为“通知”机制,即一个线程释放锁后会通知其他线程获得锁。相比“轮询”机制来说效率更高(轮询是指不断判断条件是否满足)。
总结
锁的好处:
- 确保了某段关键代码只能由一个线程从头到尾完整地执行。
锁的坏处:
- 阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。
- 由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁。
同步的应用
可以使用互斥锁使得多个线程有序地执行任务,这就是线程的同步。
1 from threading import Thread, Lock 2 import time 3 4 5 class Task1(Thread): 6 def run(self): 7 while True: 8 if lock1.acquire(): # 一开始只有lock1未上锁 9 print("Task1") 10 time.sleep(0.5) 11 lock2.release() # 释放lock2 12 13 14 class Task2(Thread): 15 def run(self): 16 while True: 17 if lock2.acquire(): # 一开始已被主线程上锁 18 print("Task2") 19 time.sleep(0.5) 20 lock3.release() # 释放lock3 21 22 23 class Task3(Thread): 24 def run(self): 25 while True: 26 if lock3.acquire(): # 一开始已被主线程上锁 27 print("Task3") 28 time.sleep(0.5) 29 lock1.release() # 释放lock1 30 31 32 if __name__ == "__main__": 33 34 lock1 = Lock() 35 lock2 = Lock() 36 lock3 = Lock() 37 # 只有lock1未上锁 38 lock2.acquire() 39 lock3.acquire() 40 t1 = Task1() 41 t2 = Task2() 42 t3 = Task3() 43 t1.start() 44 t2.start() 45 t3.start()
运行结果:
task1
task2
task3
task1
task2
task3
task1
# 不断有序执行
5. 死锁
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。尽管死锁很少发生,但一旦发生就会造成应用的停止响应。
示例:
1 from threading import Thread, Lock 2 import time 3 4 5 class MyThread1(Thread): 6 7 def run(self): 8 if mutextA.acquire(): 9 print(self.name + "1 first") 10 time.sleep(1) 11 12 if mutextB.acquire(): 13 print(self.name + "1 second") 14 mutextB.release() 15 16 mutextA.release() 17 18 19 class MyThread2(Thread): 20 21 def run(self): 22 if mutextB.acquire(): 23 print(self.name + "2 first") 24 time.sleep(1) 25 26 if mutextA.acquire(): 27 print(self.name + "2 second") 28 mutextA.release() 29 30 mutextB.release() 31 32 33 if __name__ == "__main__": 34 mutextA = Lock() 35 mutextB = Lock() 36 t1 = MyThread1() 37 t2 = MyThread2() 38 t1.start() 39 t2.start()
执行结果:程序无法往下执行,一直卡着
Thread-11 first Thread-22 first
图示死锁现象
避免死锁的方法
- 程序设计时要尽量避免(银行家算法)
- 添加超时时间等,如在 mutex.acquire() 中添加超时时间参数。
附录-银行家算法
[背景知识]
一个银行家如何将一定数目的资金安全地借给若干个客户,使这些客户既能借到钱完成要干的事,同时银行家又能收回全部资金而不至于破产,这就是银行家问题。这个问题同操作系统中资源分配问题十分相似:银行家就像一个操作系统,客户就像运行的进程,银行家的资金就是系统的资源。
[问题描述]
一个银行家拥有一定数量的资金,有若干个客户要贷款。每个客户须在一开始就声明他所需贷款的总额。若该客户贷款总额不超过银行家的资金总数,银行家可以接收客户的要求。客户贷款是以每次一个资金单位(如1万RMB等)的方式进行的,客户在借满所需的全部单位款额之前可能会等待,但银行家须保证这种等待是有限的,可完成的。
例如:有三个客户C1,C2,C3,向银行家借款,该银行家的资金总额为10个资金单位,其中C1客户要借9各资金单位,C2客户要借3个资金单位,C3客户要借8个资金单位,总计20个资金单位。某一时刻的状态如下图所示:
对于a图的状态,按照安全序列的要求,我们选的第一个客户应满足该客户所需的贷款小于等于银行家当前所剩余的钱款,可以看出只有C2客户能被满足:C2客户需1个资金单位,小银行家手中的2个资金单位,于是银行家把1个资金单位借给C2客户,使之完成工作并归还所借的3个资金单位的钱,进入b图。同理,银行家把4个资金单位借给C3客户,使其完成工作,在c图中,只剩一个客户C1,它需7个资金单位,这时银行家有8个资金单位,所以C1也能顺利借到钱并完成工作。最后(见图d)银行家收回全部10个资金单位,保证不赔本。那麽客户序列{C1,C2,C3}就是个安全序列,按照这个序列贷款,银行家才是安全的。否则的话,若在图b状态时,银行家把手中的4个资金单位借给了C1,则出现不安全状态:这时C1,C3均不能完成工作,而银行家手中又没有钱了,系统陷入僵持局面,银行家也不能收回投资。
综上所述,银行家算法是从当前状态出发,逐个按安全序列检查各客户谁能完成其工作,然后假定其完成工作且归还全部贷款,再进而检查下一个能完成工作的客户,......。如果所有客户都能完成工作,则找到一个安全序列,银行家才是安全的。
6. 生产者与消费者模式
生产者与消费者模式的介绍
在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。
在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商 品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模型。结构图如下:
生产者消费者模型的优点:
1、解耦
假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化, 可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
举个例子,我们去邮局投递信件,如果不使用邮筒(也就是缓冲区),你必须得把信直接交给邮递员。有同学会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须 得认识谁是邮递员,才能把信给他(光凭身上穿的制服,万一有人假冒,就惨了)。这就产生和你和邮递员之间的依赖(相当于生产者和消费者的强耦合)。万一哪天邮递员换人了,你还要重新认识一下(相当于消费者变化导致修改生产者代码)。
而邮筒相对来说比较固定,你依赖它的成本就比较低(相当于和缓冲区之间的弱耦合)。
2、支持并发
由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区作为桥梁连接,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区了拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。
接上面的例子,如果我们不使用邮筒,我们就得在邮局等邮递员,直到他回来,我们才能把信件交给他,这期间我们啥事儿都不能干(也就是生产者阻塞);或者邮递员得挨家挨户问,谁要寄信(相当于消费者轮询)。
3、支持忙闲不均
缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。
当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。 等生产者的制造速度慢下来,消费者再慢慢处理掉。为了充分复用,我们再拿寄信的例子来说事。假设邮递员一次只能带走1000封信。万一某次碰上情人节(也可能是圣诞节)送贺卡,需要寄出去的信超过1000封,这时 候邮筒这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮筒中,等下次过来时再拿走。
常见的线性数据结构
队列:FIFO(先进先出)
由于队列是很常见的数据结构,大部分编程语言都内置了队列的支持(具体介绍见"这里"),有些语言甚至提供了线程安全的队列。因此,开发人员可以捡现成,避免了重新发明轮子。
所以,假如数据流量不是很大,采用队列缓冲区的好处还是很明显的:逻辑清晰、代码简单、维护方便。
栈:FILO(先进后出)
Queue 模块
Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue、LIFO(后入先出)队列LifoQueue,以及优先级队列PriorityQueue。这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么就做完),能够在多线程中直接使用,实现线程间的同步。
- 阻塞队列:当阻塞队列为空时,从队列中获取元素的操作将会被阻塞;当阻塞队列为满时,从队列里添加元素的操作将会被阻塞。
- 线程安全:线程安全的类 ,指的是类内共享的全局变量的访问必须保证是不受多线程形式影响的。如果由于多线程的访问(比如修改、遍历、查看)而使这些变量结构被破坏或者针对这些变量操作的原子性被破坏,则这个类就不是线程安全的。
示例:使用FIFO队列实现生产者与消费者模式
1 from threading import Thread 2 from queue import Queue 3 import time 4 5 6 class Producer(Thread): 7 def run(self): 8 global queue 9 count = 0 10 while True: 11 if queue.qsize() < 1000: 12 for i in range(100): 13 count = count + 1 14 msg = "产出产品"+str(i) 15 queue.put(msg) 16 print(msg) 17 time.sleep(0.5) 18 19 20 class Consumer(Thread): 21 def run(self): 22 global queue 23 while True: 24 if queue.qsize() > 100: 25 for i in range(3): 26 msg = "消费了:"+queue.get() 27 print(msg) 28 time.sleep(1) 29 30 31 if __name__ == "__main__": 32 queue = Queue() 33 34 # 两个生产者线程 35 for i in range(2): 36 p = Producer() 37 p.start() 38 # 五个消费者线程 39 for i in range(5): 40 c = Consumer() 41 c.start() 42
以上是关于多线程的主要内容,如果未能解决你的问题,请参考以下文章