学习Python多线程的资料很多,吐槽Python多线程的博客也不少。本文主要介绍Python多线程实际应用,且假设读者已经了解多线程的基本概念。如果读者对进程线程概念不甚了解,可参见知名博主 阮一峰 转译的一篇博客:《进程与线程的一个简单解释》。
1 线程的基本操作
Python中多线程主要有两个模块,_thread和threading模块。前者更底层,后者更常用,能满足绝大部分编程需求,今天主要围绕threading模块展开介绍。启动一个线程需要用threading模块中的Thread。
线程的启动需要先创建Thread对象,然后调用该对象的start()方法,参见下例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import time import threading def func(n): while n > 0 : print ( "线程name:" , threading.current_thread().name, "参数n:" , n) n - = 1 time.sleep( 1 ) t = threading.Thread(target = func, args = ( 5 ,)) t.start() print ( "主线程:" , threading.current_thread().name) # 运行结果: # 线程name: Thread-1 参数n: 5 # 主线程: MainThread # 线程name: Thread-1 参数n: 4 # 线程name: Thread-1 参数n: 3 # 线程name: Thread-1 参数n: 2 # 线程name: Thread-1 参数n: 1 |
上例中,threading.current_thread().name 是获取当前线程的name属性。
Thread中,形参target传入函数名,args传入函数对应的参数,参数必须是可迭代对象,如果是元组且只有一个参数必须写成(参数,)的形式,逗号不能省略。
一旦启动一个线程,该线程将由操作系统来全权管理,独立执行直到目标函数返回。一般情况下,线程的操作有以下几种:
1
2
3
4
5
|
t.is_alive() # 查询线程对象的状态,返回布尔值 t.join() # 将线程加入到当前线程,并等待其终止 t = Thread(target = countdown, args = ( 10 ,), daemon = True ) # 后台线程 t.start() |
查看线程状态示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import time import threading def func(n): while n > 0 : print ( "线程name:" , threading.current_thread().name, "参数n:" , n) n - = 1 time.sleep( 1 ) t = threading.Thread(target = func, args = ( 2 ,)) t.start() print ( "主线程:" , threading.current_thread().name) if t.is_alive(): print ( "活着的" ) else : print ( "未存活" ) print ( "主线程结束" ) |
让主线程等待其他线程,就是主线程会在join()处一直等待所有线程都结束之后,再继续运行。参见下例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import time import threading def func(n): while n > 0 : print ( "线程name:" , threading.current_thread().name, "参数n:" , n) n - = 1 time.sleep( 1 ) t = threading.Thread(target = func, args = ( 2 ,)) t.start() t.join() print ( "主线程:" , threading.current_thread().name) print ( "主线程结束" ) # 运行结果: # 线程name: Thread-1 参数n: 2 # 线程name: Thread-1 参数n: 1 # 主线程: MainThread # 主线程结束 |
后台线程参见下例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import time import threading def func(n): while n > 0 : print ( "参数n:" , n) n - = 1 time.sleep( 1 ) t = threading.Thread(target = func, args = ( 10 , ), daemon = True ) t.start() time.sleep( 3 ) print ( "主线程结束" ) # 参数n: 10 # 参数n: 9 # 参数n: 8 # 参数n: 7 # 主线程结束 |
后台线程无法等待,但主线程终止时后台线程自动销毁。 如果要对线程进行高级操作,如发送信号,终止线程,都需要自己实现。下例通过轮询控制线程退出:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import time from threading import Thread class StopThread: def __init__( self ): self ._flag = True def terminate( self ): self ._flag = False def run( self , n): while self ._flag and n > 0 : print ( \'num>>:\' , n) n - = 1 time.sleep( 1 ) obj = StopThread() t = Thread(target = obj.run, args = ( 11 ,)) t.start() time.sleep( 5 ) # 表示do something obj.terminate() # 终止线程 t.join() print ( "主线程结束" ) |
上例通过类中的_flag控制线程的终止,当主线程执行5秒之后,主动将_flag赋值为False终止线程。通过轮询终止线程存在一个问题,如果while self._flag and n > 0:这句后,某次循环一直阻塞在I/O操作上,根本不会进行下一次循环,自然就无法终止。这该怎么办呢?留一个思考题。
多线程还可以通过继承Thread实现,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import time from threading import Thread class A(Thread): def __init__( self ,): super ().__init__() def run( self ): print ( "run1.." , ) time.sleep( 5 ) print ( "run2.." ) obj = A() obj.start() print ( "主线程结束" ) |
2 线程锁和一个怪象
当我们用多个线程同时修改同一份数据时,怎么保证最终结果是我们期许的呢?举个例子,当前有一个全局变量a=0,如果有10个线程同时对其加1,这就出现了线程间的竞争,到底应该听谁的呢?这时候,应该用线程锁来解决。也就是当某一个线程A对该数据操作时,对该数据加锁,其他线程只能等着,等待A操作完之后释放了该锁,其他线程才能操作该数据,一旦某个线程获得操作数据的权限,立即又加上锁。如此便能保证数据的安全准确。奇怪的是,在Python3中,即使不加锁,好像也不会发生数据出错的情况。或许这个例子不是很好,也或许是Python3中自动加了锁。希望有知道的读者赐教一下。这个奇怪的现象就是下例了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
from threading import Thread import time def add_one(a): time.sleep( 1 ) print ( "in thread a:" , a) a[ 1 ] + = 1 if __name__ = = \'__main__\' : array = [ 0 , 1 , 4 ] thread_obj_list = [] for i in range ( 50 ): t = Thread(target = add_one, args = (array,)) t.start() thread_obj_list.append(t) for j in thread_obj_list: j.join() print ( "array result::" , array) # array result:: [0, 51, 4] |
我们看到,最后array的第二个元素是51,并没有出错,这真是令人费解。好了,言归正传,来看看线程锁的几个方法吧:
1
2
3
|
lock = threading.Lock() # Lock对象 lock.acquire() # 锁定 lock.release() # 解锁 |
Lock有“锁定”或“解锁”两种状态之一。它是在解锁状态下创建的。它有两个基本方法,acquire() 和 release()。
当状态为解锁时,acquire()将状态更改为锁定并立即返回。当状态被锁定时,acquire()块直到对另一个协程中的release()的调用将其改变为解锁,然后acquire()调用将其重置为锁定并返回。
release()方法只应在锁定状态下调用;它将状态更改为已解锁并立即返回。如果尝试释放已解锁的锁,则会引发 RuntimeError。
下面是一个具体的使用例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
from threading import Thread import time import threading lock = threading.Lock() def add_one(a): time.sleep( 1 ) lock.acquire() a[ 1 ] + = 1 lock.release() if __name__ = = \'__main__\' : array = [ 0 , 1 , 4 ] thread_obj_list = [] for i in range ( 50 ): t = Thread(target = add_one, args = (array,)) t.start() thread_obj_list.append(t) for j in thread_obj_list: j.join() print ( "array result::" , array) # array result:: [0, 51, 4] |
acquire()和release()方法成对出现。但是这样手动释放有时候可能会遗忘,这时候可以考虑用上下文管理协议。关于上下文管理协议,可参见作者的这篇文章【Python上下文管理器】。
Lock对象支持with语句:
1
2
3
4
|
def add_one(a): time.sleep( 1 ) with lock: a[ 1 ] + = 1 |
3 递归锁
可重入锁(又称递归锁,RLock),就是大锁中包含子锁的情况下使用。在这种情况下,再用Lock时,就会出现死锁现象,此时应该用threading.RLock()对象了,用法同Lock,参见下例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
from threading import Thread import time import threading lock = threading.RLock() def add_one(a): lock.acquire() a[ 1 ] + = 1 lock.release() def add_two(b): time.sleep( 1 ) lock.acquire() b[ 1 ] + = 2 add_one(b) lock.release() if __name__ = = \'__main__\' : array = [ 0 , 1 , 4 ] thread_obj_list = [] for i in range ( 50 ): t = Thread(target = add_two, args = (array,)) t.start() thread_obj_list.append(t) for j in thread_obj_list: j.join() print ( "array result::" , array) # array result:: [0, 151, 4] |
上例读者可以试试Lock(),看看什么效果。RLock()还支持上下文管理协议,上例中的两个函数可以改成这样:
1
2
3
4
5
6
7
8
9
|
def add_one(a): with rlock: a[ 1 ] + = 1 def add_two(b): time.sleep( 1 ) with rlock: b[ 1 ] + = 2 add_one(b) |
4 GIL
全局解释器锁(英语:Global Interpreter Lock,缩写GIL),是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。所以很多人说Python的线程是假线程,并能利用多核,并不能真正并行。之所以感觉到线程并行,是因为线程上下文不断切换的缘故。Python 3.2开始使用新的GIL。新的GIL实现中用一个固定的超时时间来指示当前的线程放弃全局锁。在当前线程保持这个锁,且其他线程请求这个锁时,当前线程就会在5毫秒后被强制释放该锁。关于全局锁,强调三点:
(1)GIL的存在,同一时刻只能有一个线程在运行。
(2)GIL是CPython的特性,Jython,pypy等并无GIL。
(3)Cpython的多线程适用于I/O密集型问题,计算密集型问题可使用多进程编程。
5 判断线程状态
在多线程编程中,有时候某个线程依赖另一个线程的状态,需要使用threading库中的Event对象。 Event对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。可将线程设置等待Event对象, 直到有其他线程将Event对象设置为真,这些等待Event对象的线程将开始执行。Event()对象的常用方法:
1
2
3
4
5
6
|
event = threading.Event() # 创建threading.Event()对象 event.is_set() # 获取event的设置值,默认为False event. set () # 设置event的值为True event.clear() # 设置event的值为False event.wait() # 等到event的值被设为True就执行 |
下面通过“交通信号灯”问题示范event的使用:
1
|