python线程
Posted 追风弧箭
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python线程相关的知识,希望对你有一定的参考价值。
python的threading用于提供线程相关的操作,线程是应用程序中最小的单元。使用threading有两种方式来创建线程:一种是通过继承Thread类,重写它的run方法;另外一种是创建一个threading.Thread对象,在它的初始化函数中将可调用对象最为参数传入。下面分别举例来说明,先来看看通过继承threading.Thread类来创建线程的例子:
# coding=utf-8
import threading, time, random
count = 0
class Counter(threading.Thread):
def __init__(self, lock, threadName):
'''@summary: 初始化对象。
@param lock: 琐对象。
@param threadName: 线程名称。
'''
super(Counter, self).__init__(name=threadName) # 注意:一定要显式的调用父类的初化函数。
self.lock = lock
def run(self):
'''@summary: 重写父类run方法,在线程启动后执行该方法内的代码。
'''
global count
self.lock.acquire()
for i in range(10000):
count = count + 1
self.lock.release()
lock = threading.Lock()
threads = []
for i in range(5):
t =Counter(lock, "thread-" + str(i))
threads.append(t)
t.start()
# 确保线程都执行完毕
for t in threads:
t.join()
print(count)
在代码中,我们创建了一个Counter类,它继承了threading.Thread类。初始化函数接收了两个参数,一个是所对象,一个是线程名称。在Counter中,重写了从父类继承的run方法(run方法是多线程逻辑代码执行的地方),将在线程开启后执行,start()方法用于启动线程。
另外一种创建线程的方式:
import threading, time, random
count = 0
lock = threading.Lock()
def doAdd():
'''@summary: 将全局变量count 逐一的增加10000。
'''
global count, lock
lock.acquire()
for i in range(10000):
count = count + 1
lock.release()
for i in range(5):
threading.Thread(target = doAdd, args = (), name = 'thread-' + str(i)).start()
time.sleep(2) #确保线程都执行完毕
print (count)
上面的代码中,我们把函数对象doAdd作为参数传递给了它的threading.Thread对象,再调用该对象的start方法,线程启动后执行doAdd函数。threading.Thread类的初始化函数原型:
def __init__(self, group=None, target=None, name=None, args=(), kwargs=)
- 参数group是预留的,用于将来拓展
- 参数target是一个可调用对象(也称为活动[activity]),在线程启动后执行
- 参数name是线程的名字。默认值为“Thread-N“,N是一个数字
- 参数args和kwargs分别表示调用target时的参数列表和关键字参数
threading模块提供的类:
Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。
threading.TIMEOUT_MAX 设置threading全局超时时间。
Thread类还定义一下方法和属性:
threading.currentThread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
Thread.getName() Thread.setName() Thread.name
用来获取和设置线程名称。
Thread.ident
获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
Thread.is_alive() Thread.isAlive()
判断线程是否是激活的(alive)。从调用start()方法启动线程,到run()方法执行完毕或遇到未处理异常而中断 这段时间内,线程是激活的。
is/setDaemon(bool): 获取/设置是后台线程(默认前台线程(False))。(在start之前设置)如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,主线程和后台线程均停止如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止.
Thread.join([timeout])
调用Thread.join将会使主调线程堵塞,直到被调用线程运行结束或超时。参数timeout是一个数值类型,表示超时时间,如果未提供该参数,那么主调线程将一直堵塞到被调线程结束。下面举个例子说明join()的使用:
import threading, time
def doWaiting():
print 'start waiting:', time.strftime('%H:%M:%S')
time.sleep(3)
print 'stop waiting', time.strftime('%H:%M:%S')
thread1 = threading.Thread(target = doWaiting)
thread1.start()
time.sleep(1) #确保线程thread1已经启动
print 'start join'
thread1.join() #将一直堵塞,直到thread1运行结束。
print 'end join'
threading.RLock和threading.Lock
在threading模块中,定义两种类型的琐:threading.Lock和threading.RLock。
acquire([timeout]): 尝试获得锁定。使线程进入同步阻塞状态。
release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常
它们之间有一点细微的区别,通过比较下面两段代码来说明:
import threading
lock = threading.Lock() #Lock对象
lock.acquire()
lock.acquire() #产生了死琐。
lock.release()
lock.release()
import threading
rLock = threading.RLock() #RLock对象
rLock.acquire()
rLock.acquire() #在同一线程内,程序不会堵塞。
rLock.release()
rLock.release()
这两种琐的主要区别是:RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意:如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。Lock属于全局,Rlock属于线程。
Condition类
Condition(条件变量)通常与一个锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。
可以认为,处理Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于阻塞状态,知道另外一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。
构造方法:
Condition([Lock/RLock])
实例方法:
acquire([timeout])/release(): 调用关联的锁的相应方法。
wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。
notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
如下是一个生产者与消费者的程序示例:
import threading
import time
condition = threading.Condition()
products = 0
class Producer(threading.Thread):
def run(self):
global products
while True:
if condition.acquire():
if products < 10:
products += 1;
print ("Producer(%s):deliver one, now products:%s" %(self.name, products))
condition.notify()#不释放锁定,因此需要下面一句
condition.release()
else:
print ("Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products))
condition.wait();#自动释放锁定
time.sleep(2)
class Consumer(threading.Thread):
def run(self):
global products
while True:
if condition.acquire():
if products > 1:
products -= 1
print ("Consumer(%s):consume one, now products:%s" %(self.name, products))
condition.notify()
condition.release()
else:
print ("Consumer(%s):only 1, stop consume, products:%s" %(self.name, products))
condition.wait();
time.sleep(2)
if __name__ == "__main__":
for p in range(0, 2):
p = Producer()
p.start()
for c in range(0, 3):
c = Consumer()
c.start()
Event类
Event(事件)是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。Event内置了一个初始为False的标志,当调用set()时设为True,调用clear()时重置为 False。wait()将阻塞线程至等待阻塞状态。Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。
构造方法:
Event()
实例方法:
isSet(): 当内置标志为True时返回True。
set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。
clear(): 将标志设为False。
wait([timeout]): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。
代码实例:
# encoding: UTF-8
import threading
import time
event = threading.Event()
def func():
# 等待事件,进入等待阻塞状态
print ('%s wait for event...' % threading.currentThread().getName())
event.wait()
# 收到事件后进入运行状态
print ('%s recv event.' % threading.currentThread().getName())
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t1.start()
t2.start()
time.sleep(2)
# 发送事件通知
print ('MainThread set event.')
event.set()
输出信息:
Thread-1 wait for event…
Thread-2 wait for event…
MainThread set event.
Thread-1 recv event.
Thread-2 recv event.
Timer类
Timer(定时器)是Thread的派生类,用于在指定时间后调用一个方法。
构造方法:Timer(interval,function,args=[],kwargs=)
参数说明:
interval:指定的时间(秒为单位)
function: 要执行的方法
args/kwargs:方法的参数
代码示例:
# encoding: UTF-8
import threading
def func():
print 'hello timer!'
timer = threading.Timer(5, func)
timer.start()
这段代码中的func函数在延迟5秒之后执行。
local类
local是一个小写字母开头的类,用于管理thread-local(线程局部变量),对于同一个local,线程无法访问其他线程设置的属性。线程设置的属性不会被其他线程设置的同名属性替换。
可以把local看成是一个“线程-属性字典”的字典,local封装了从自身使用线程作为key检索对应的属性字典,再使用属性名作为key检索属性的细节。
代码示例:
# encoding: UTF-8
import threading
local = threading.local()
local.tname = 'main'
def func():
local.tname = 'notmain'
print local.tname
t1 = threading.Thread(target=func)
t1.start()
t1.join()
print local.tname
输出信息:
notmain
main
在实际使用过程python过程中,使用线程的实例如何避免线程之间的竞争?
示例1 我们将要请求五个不同的url
单线程模式:
import time
import urllib.request
def get_responses():
urls = [
'http://www.baidu.com',
'http://www.csdn.net/',
'http://www.alibaba.com',
]
start = time.time()
for url in urls:
print(url)
resp = urllib.request.urlopen(url)
print(resp.getcode())
print ("Elapsed time: %s" % (time.time() - start))
get_responses()
输出信息:
http://www.baidu.com
200
http://www.csdn.net/
200
http://www.alibaba.com
200
多线程模式:
import time
import urllib.request
from threading import Thread
class Worker(Thread):
def __init__(self,url):
self._url = url
super(Worker,self).__init__()
def run(self):
print(self._url)
resp = urllib.request.urlopen(self._url)
print(resp.getcode())
def do_works():
urls = [
'http://www.baidu.com',
'http://www.csdn.net/',
'http://www.alibaba.com',
]
start = time.time()
threads = []
for url in urls:
w = Worker(url)
threads.append(w)
w.start()
for w in threads:
w.join()
print("Elapsed time: %s" % (time.time() - start))
do_works()
输出信息:
http://www.baidu.com
http://www.csdn.net/
http://www.alibaba.com
200
200
200
Elapsed time: 0.2839999198913574
解释如下:
- 我们写了一个多线程程序来减少cpu的等待时间,当我们在等待一个线程内的网络请求返回时,这时cpu可以切换到其他线程去进行其他线程内的网络请求。
- 我们期望一个线程处理一个url,所以实例化线程类的时候我们传了一个url。
- 线程运行意味着执行类里的run()方法。
- 无论如何我们想每个线程必须执行run()。
- 为每个url创建一个线程并且调用start()方法,这告诉了cpu可以执行线程中的run()方法了。
- 我们希望所有的线程执行完毕的时候再计算花费的时间,所以调用了join()方法。
- join()可以通知主线程等待这个线程结束后,才可以执行下一条指令。
- 每个线程我们都调用了join()方法,所以我们是在所有线程执行完毕后计算的运行时间。
关于线程:
- cpu可能不会在调用start()后马上执行run()方法。
- 你不能确定run()在不同线程建间的执行顺序。
- 对于单独的一个线程,可以保证run()方法里的语句是按照顺序执行的。
- 这就是因为线程内的url会首先被请求,然后打印出返回的结果。
演示多线程间的资源竞争,并且修复这个问题。
from threading import Thread
some_var = 0
class IncrementThead(Thread):
def run(self):
global some_var
read_value = some_var
print('some_var in %s is %d'% (self.name,read_value))
some_var = read_value +1
print('some_var in %s after increment is %d'%(self.name,some_var))
def use_increment_thread():
threads = []
for i in range(50):
t = IncrementThead()
threads.append(t)
t.start()
for t in threads:
t.join()
print('after 50 modifications, some_var is %d' % (some_var,))
use_increment_thread()
反复运行上面的代码,每次的some_var最终结果都有可能不一样,为什么会这样?
- 假设在some_var是15的时候,线程t1读取了some_var,这个时刻cpu将控制权给了另一个线程t2。
- t2线程读到的some_var也是15
- t1和t2都把some_var加到16
- 当时我们期望的是t1 t2两个线程使some_var + 2变成17
- 在这里就有了资源竞争。相同的情况也可能发生在其它的线程间,所以出现了最后的结果小于50的情况。
解决资源竞争,加锁。
from threading import Thread,Lock
lock = Lock()
some_var = 0
class IncrementThead(Thread):
def run(self):
global some_var
lock.acquire()
read_value = some_var
print('some_var in %s is %d'% (self.name,read_value))
some_var = read_value +1
print('some_var in %s after increment is %d'%(self.name,some_var))
lock.release()
def use_increment_thread():
threads = []
for i in range(50):
t = IncrementThead()
threads.append(t)
t.start()
for t in threads:
t.join()
print('after 50 modifications, some_var is %d' % (some_var,))
use_increment_thread()
以上是关于python线程的主要内容,如果未能解决你的问题,请参考以下文章