threading学习
Posted Freeman耀
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了threading学习相关的知识,希望对你有一定的参考价值。
多线程类似于同时执行多个不同程序,多线程运行有如下优点:
- 使用线程可以把占据长时间的程序中的任务放到后台去处理。
- 用户界面可以更加吸引人,这样比如用户点击了一个按钮去触发某些事件的处理,可以弹出一个进度条来显示处理的进度
- 程序的运行速度可能加快
- 在一些等待的任务实现上如用户输入、文件读写和网络收发数据等,线程就比较有用了。在这种情况下我们可以释放一些珍贵的资源如内存占用等等。
threading模块提供的类:
Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。
threading 模块提供的常用方法:
threading.currentThread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
threading 模块提供的常量:
threading.TIMEOUT_MAX 设置threading全局超时时间。
Thread是线程类,有两种使用方法,直接传入要运行的方法或从Thread继承并覆盖run():
1 # coding:utf-8 2 import threading 3 import time 4 #方法一:将要执行的方法作为参数传给Thread的构造方法 5 def action(arg): 6 time.sleep(1) 7 print(\'the arg is:%s\\r\' %arg) 8 9 for i in range(4): 10 t =threading.Thread(target=action,args=(i,)) 11 t.start() 12 13 print(\'main thread end!\') 14 15 #方法二:从Thread继承,并重写run() 16 class MyThread(threading.Thread): 17 def __init__(self,arg): 18 super(MyThread, self).__init__()#注意:一定要显式的调用父类的初始化函数。 19 self.arg=arg 20 def run(self):#定义每个线程要运行的函数 21 time.sleep(1) 22 print \'the arg is:%s\\r\' % self.arg 23 24 for i in range(4): 25 t =MyThread(i) 26 t.start() 27 28 print(\'main thread end!\')
构造方法:
Thread(group=None, target=None, name=None, args=(), kwargs={})
group: 线程组,目前还没有实现,库引用中提示必须是None;
target: 要执行的方法;
name: 线程名;
args/kwargs: 要传入方法的参数。
实例方法:
isAlive(): 返回线程是否在运行。正在运行指启动后、终止前。
get/setName(name): 获取/设置线程名。
start(): 线程准备就绪,等待CPU调度
is/setDaemon(bool): 获取/设置是后台线程(默认前台线程(False))。(在start之前设置)
如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,主线程和后台线程均停止
如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
start(): 启动线程。
join([timeout]): 阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数)。
使用例子一(未设置setDeamon):
1 # coding:utf-8 2 import threading 3 import time 4 5 def action(arg): 6 time.sleep(1) 7 print(\'sub thread start!the thread name is:%s\\r\' % threading.currentThread().getName()) 8 print(\'the arg is:%s\\r\' %arg) 9 time.sleep(1) 10 11 for i in range(4): 12 t =threading.Thread(target=action,args=(i,)) 13 t.start() 14 15 print(\'main_thread end!\')
运行结果
1 main_thread end! 2 sub thread start!the thread name is:Thread-2 3 the arg is:1 4 the arg is:0 5 sub thread start!the thread name is:Thread-4 6 the arg is:2 7 the arg is:3 8 Process finished with exit code 0 9 可以看出,创建的4个“前台”线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
验证了serDeamon(False)(默认)前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,主线程停止。
使用例子二(setDeamon=True)
1 # coding:utf-8 2 import threading 3 import time 4 5 def action(arg): 6 time.sleep(1) 7 print(\'sub thread start!the thread name is:%s\\r\' % threading.currentThread().getName()) 8 print(\'the arg is:%s\\r\' %arg) 9 time.sleep(1) 10 11 for i in range(4): 12 t =threading.Thread(target=action,args=(i,)) 13 t.setDaemon(True)#设置线程为后台线程 14 t.start() 15 16 print(\'main_thread end!\')
运行结果
1 main_thread end! 2 3 Process finished with exit code 0 4 5 可以看出,主线程执行完毕后,后台线程不管是成功与否,主线程均停止
验证了serDeamon(True)后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,主线程均停止。
使用例子三(设置join)
1 #coding:utf-8 2 import threading 3 import time 4 5 def action(arg): 6 time.sleep(1) 7 print(\'sub thread start!the thread name is:%s \' % threading.currentThread().getName()) 8 print(\'the arg is:%s \' %arg) 9 time.sleep(1) 10 11 thread_list = [] #线程存放列表 12 for i in range(4): 13 t =threading.Thread(target=action,args=(i,)) 14 t.setDaemon(True) 15 thread_list.append(t) 16 17 for t in thread_list: 18 t.start() 19 20 for t in thread_list: 21 t.join()
运行结果
1 sub thread start!the thread name is:Thread-2 2 the arg is:1 3 sub thread start!the thread name is:Thread-3 4 the arg is:2 5 sub thread start!the thread name is:Thread-1 6 the arg is:0 7 sub thread start!the thread name is:Thread-4 8 the arg is:3 9 main_thread end! 10 11 Process finished with exit code 0 12 13 设置join之后,主线程等待子线程全部执行完成后或者子线程超时后,主线程才结束
验证了 join()阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout,即使设置了setDeamon(True)主线程依然要等待子线程结束。
使用例子四(join不妥当的用法,使多线程编程顺序执行)
1 #coding:utf-8 2 import threading 3 import time 4 5 def action(arg): 6 time.sleep(1) 7 print(\'sub thread start!the thread name is:%s \' % threading.currentThread().getName()) 8 print(\'the arg is:%s \' %arg) 9 time.sleep(1) 10 11 12 for i in range(4): 13 t =threading.Thread(target=action,args=(i,)) 14 t.setDaemon(True) 15 t.start() 16 t.join() 17 18 print(\'main_thread end!\')
运行结果
1 sub thread start!the thread name is:Thread-1 2 the arg is:0 3 sub thread start!the thread name is:Thread-2 4 the arg is:1 5 sub thread start!the thread name is:Thread-3 6 the arg is:2 7 sub thread start!the thread name is:Thread-4 8 the arg is:3 9 main_thread end! 10 11 Process finished with exit code 0 12 可以看出此时,程序只能顺序执行,每个线程都被上一个线程的join阻塞,使得“多线程”失去了多线程意义。
Lock、Rlock类
由于线程之间随机调度:某线程可能在执行n条后,CPU接着执行其他线程。为了多个线程同时操作一个内存中的资源时不产生混乱,我们使用锁。
Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。
可以认为Lock有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。池中的线程处于状态图中的同步阻塞状态。
RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。
可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。
简言之:Lock属于全局,Rlock属于线程。
构造方法:
Lock(),Rlock(),推荐使用Rlock()
实例方法:
acquire([timeout]): 尝试获得锁定。使线程进入同步阻塞状态。
release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。
例子一(未使用锁)
1 #coding:utf-8 2 import threading 3 import time 4 5 gl_num = 0 6 7 def show(): 8 global gl_num 9 time.sleep(1) 10 gl_num +=1 11 print gl_num 12 13 for i in range(10): 14 t = threading.Thread(target=show) 15 t.start() 16 17 print(\'main thread stop\') 18 19 运行结果 20 main thread stop 21 12 22 23 3 24 4 25 568 26 9 27 28 910 29 30 31 Process finished with exit code 0 32 33 多次运行可能产生混乱。这种场景就是适合使用锁的场景。
例子二(使用锁):
1 # coding:utf-8 2 3 import threading 4 import time 5 6 gl_num = 0 7 8 lock = threading.RLock() 9 10 # 调用acquire([timeout])时,线程将一直阻塞, 11 # 直到获得锁定或者直到timeout秒后(timeout参数可选)。 12 # 返回是否获得锁。 13 def Func(): 14 lock.acquire() 15 global gl_num 16 gl_num += 1 17 time.sleep(1) 18 print gl_num 19 lock.release() 20 21 for i in range(10): 22 t = threading.Thread(target=Func) 23 t.start() 24 25 26 运行结果 27 28 1 29 2 30 3 31 4 32 5 33 6 34 7 35 8 36 9 37 10 38 39 Process finished with exit code 0 40 可以看出,全局变量在在每次被调用时都要获得锁,才能操作,因此保证了共享数据的安全性
Lock对比Rlock
1 #coding:utf-8 2 3 import threading 4 lock = threading.Lock() #Lock对象 5 lock.acquire() 6 lock.acquire() #产生了死锁。 7 lock.release() 8 lock.release() 9 print(lock.acquire()) 10 11 12 import threading 13 rLock = threading.RLock() #RLock对象 14 rLock.acquire() 15 rLock.acquire() #在同一线程内,程序不会堵塞。 16 rLock.release() 17 rLock.release()
Condition类
Condition(条件变量)通常与一个锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。
可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。
构造方法:
Condition([lock/rlock])
实例方法:
acquire([timeout])/release(): 调用关联的锁的相应方法。
wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。
notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
例子:生产者消费者模型
1 import threading 2 from threading import Thread 3 import time 4 import random 5 6 7 def worker_func(): 8 print(\'worker thread started in %s\' % (threading.current_thread())) 9 random.seed() 10 time.sleep(random.random()) 11 print(\'worker thread finished in %s\' % (threading.current_thread())) 12 13 14 def simple_thread_demo(tn=5): 15 for i in range(tn): 16 t = Thread(target=worker_func) 17 t.start() 18 19 20 gLock = threading.Lock() 21 gRLock = threading.RLock() 22 gSemaphore = threading.Semaphore(3) 23 gPool = 1000 24 gCondition = threading.Condition() 25 26 27 def worker_func_lock(lock): 28 lock.acquire() 29 worker_func() 30 lock.release() 31 32 33 def thread_lock_demo(tn=5): 34 for i in range(tn): 35 t = Thread(target=worker_func_lock, args=[gSemaphore]) 36 t.start() 37 38 39 class Consumer(Thread): 40 def run(self): 41 print(\'%s started\' % threading.current_thread()) 42 while True: 43 global gPool 44 global gCondition 45 46 gCondition.acquire() 47 random.seed() 48 c = random.randint(500, 1000) 49 print(\'%s: Trying to consume %d. Left %d\' % (threading.current_thread(), c, gPool)) 50 while gPool < c: 51 gCondition.wait() 52 gPool -= c 53 time.sleep(random.random()) 54 print(\'%s: Consumed %d. Left %d\' % (threading.current_thread(), c, gPool)) 55 gCondition.release() 56 57 58 class Producer(Thread): 59 def run(self): 60 print(\'%s started\' % threading.current_thread()) 61 while True: 62 global gPool 63 global gCondition 64 65 gCondition.acquire() 66 random.seed() 67 p = random.randint(100, 200) 68 gPool += p 69 print(\'%s: Produced %d. Left %d\' % (threading.current_thread(), p, gPool)) 70 time.sleep(random.random()) 71 gCondition.notify_all() 72 gCondition.release() 73 74 75 def consumer_producer_demo(): 76 for i in range(1): 77 Consumer().start() 78 79 for i in range(1): 80 Producer().start() 81 82 83 if __name__ == \'__main__\': 84 # simple_thread_demo() 85 # thread_lock_demo() 86 consumer_producer_demo()
信号量(Semaphore)
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
1 import threading,time 2 3 def run(n): 4 semaphore.acquire() 5 time.sleep(1) 6 print("run the thread: %s" %n) 7 semaphore.release() 8 9 if __name__ == \'__main__\': 10 11 num= 0 12 semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行 13 for i in range(20): 14 t = threading.Thread(target=run,args=(i,)) 15 t.start()
Event类
Event(事件)是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。Event内置了一个初始为False的标志,当调用set()时设为True,调用clear()时重置为 False。wait()将阻塞线程至等待阻塞状态。
Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。
构造方法:
Event()
实例方法:
isSet(): 当内置标志为True时返回True。
set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。
clear(): 将标志设为False。
wait([timeout]): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。
1 # encoding: utf-8 2 import threading 3 import time 4 5 event = threading.Event() 6 7 8 def func(): 9 # 等待事件,进入等待阻塞状态 10 print(\'%s wait for event...\' % threading.currentThread().getName()) 11 event.wait() 12 13 # 收到事件后进入运行状态 14 print(\'%s recv event.\' % threading.currentThread().getName()) 15 16 17 t1 = threading.Thread(target=func) 18 t2 = threading.Thread(target=func) 19 t1.start() 20 t2.start() 21 22 time.sleep(2) 23 24 # 发送事件通知 25 print(\'MainThread set event.\') 26 event.set()
1 运行结果 2 Thread-1 wait for event... 3 Thread-2 wait for event... 4 5 #2秒后。。。 6 MainThread set event. 7 Thread-1 recv event. 8 Thread-2 recv event.
timer类
Timer(定时器)是Thread的派生类,用于在指定时间后调用一个方法。
构造方法:
Timer(interval, function, args=[], kwargs={})
interval: 指定的时间
function: 要执行的方法
args/kwargs: 方法的参数
实例方法:
Timer从Thread派生,没有增加实例方法。
1 # encoding: utf-8 2 import threading 3 4 5 def func(): 6 print(\'hello timer!\') 7 8 9 timer = threading.Timer(5, func) 10 timer.start()
线程延迟5秒后执行.
local类
local是一个小写字母开头的类,用于管理 thread-local(线程局部的)数据。对于同一个local,线程无法访问其他线程设置的属性;线程设置的属性不会被其他线程设置的同名属性替换。
可以把local看成是一个“线程-属性字典”的字典,local封装了从自身使用线程作为 key检索对应的属性字典、再使用属性名作为key检索属性值的细节。
1 # encoding: utf-8 2 import threading 3 4 local = threading.local() 5 local.tname = \'main\' 6 7 def func(): 8 local.tname = \'notmain\' 9 print(local.tname) 10 11 t1 = threading.Thread(target=func) 12 t1.start() 13 t1.join() 14 15 print(local.tname) 16 17 运行结果 18 notmain 19 main
参考文章链接:
http://www.cnblogs.com/huxi/archive/2010/06/26/1765808.html
http://www.cnblogs.com/wupeiqi/articles/5040827.html
以上是关于threading学习的主要内容,如果未能解决你的问题,请参考以下文章