Python的多线程
Posted 苏导
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python的多线程相关的知识,希望对你有一定的参考价值。
简述
多线程设计在系统中是比较关键的部分,对于系统性能的提高以及一个较为复杂框架的构建都是很重要的。
进程与线程差别
这部分许多资料可供参考,比如一些动画介绍,还有CSAPP书中也详细讲到。在嵌入式系统中一般是没有进程和线程区分概念的,因为嵌入式系统就跑一个程序(一个main入口),通过RTOS管理其中的各个线程(一般称为task),其实总的就一个进程,可以独享嵌入式系统的Flash, RAM等资源,也不会有IPC等技术和概念。
在多用户操作系统,一般非RTOS就是吧,Linux, Windows, android, MacOS, ios,上面是会有许许多多的程序可以同时被运行,最后被终止的。一个程序其实就是一段二进制代码,放在这些计算机的存储器上,只有这个程序被加载并执行,此时就有针对这个程序的生命周期和进程概念了,因此其实进程指的是程序的一次执行,进程有其独立的内存、栈、地址空间等,进程间通信需要用IPC进行进程间通信。
Python中的线程
Python设计之初的考虑是在Python解释器主循环中同时只有一个线程在执行,即Python解释器可以运行多个线程,但是任意时刻只有一个线程在解释器中运行。而做到这一点是通过Python的Global Interpreter Lock-GIL来实现的:
Python解释器使用GIL控制线程执行
首先设置GIL,并切换到一个线程去执行
然后运行指定数量的字节码指令,或者线程主动让出控制
把线程设置为睡眠状态,并解锁GIL
Python多线程编程-threading模块
目前需要在Python代码中使用多线程设计,都是import threading moduel。目前Python的线程没有优先级区分,也没有线程组,线程不可销毁、停止、挂起、恢复或中断。
threading module的方法
方法名 | 详细说明 |
---|---|
threading.active_count() | 返回当前处于激活状态的Thread对象个数,返回的个数等于threading.enumerate()返回的list的长度 |
threading.current_thread() | 返回当前的Thread对象,相当于调用者的线程 |
threading.get_ident() | 返回当前线程的线程identifier,这是一个非零值 |
threading.enumerate() | 返回当前处于激活状态的Thread对象,以list形式返回,包含daemonic线程,dummy线程和main线程 |
threading.main_thread() | 返回main线程对象,一般是Python解释器开始运行的线程 |
threading.settrace(func) | 为所有从threading module创建的线程设置一个trace函数,该函数会在每个线程的run()方法被调用前,被传给sys.settrace() |
threading.setprofile(func) | 为所有从threading module创建的线程设置一个profile函数,该函数会在每个线程的run()方法被调用前,被传给sys.setprofile() |
threading.stack_size([size]) | 当创建一个新的线程时返回该线程使用的stack size,size参数可选,用来指定所创建线程的stack size,必须是0或是一个至少为32768的正整数。size未指定会默认使用0值,RuntimeError表示改变stack size不支持,ValueError表示stack size非法值。32K的stack size是目前支持最小的值,足够的stack空间主要是解释器本身需要占用。一般memory是4096 Bytes一个page,Stack size建议设置为4K Bytes的倍数 |
threading module的常数
常数 | 详细说明 |
---|---|
threading.TIMEOUT_MAX | 阻塞函数(Lock.acquire(), RLock.acquire(), Condition.wait()等)的允许最大的超时值,如果使用的timeout参数大于这个最大值,就会发生OverflowError |
threading module的Thread类(对象)
Thread类是可以单独控制和运行的线程,可以通过传递一个可调用对象给构造器,或者重写子类的run()方法来指定线程的运作。线程对象被创建,就必须通过start()方法来开始运作,此时会调用其run()方法,线程就激活了。直到run()方法终止,线程就停止。
其他线程可以调用本线程的join()方法,此时调用本线程join()方法的线程会阻塞,等待本线程执行结束退出,再继续调用线程的后续执行。
线程也可以标识为daemon线程,在Python程序退出时,daemon线程仍然保留,知道关机时才会退出。
还有一种main线程,即开始运行Python程序的线程,为非daemon线程。
还有dummy线程,这种线程是在threading module之外开启,比如通过调用的C代码创建的线程。
类及方法 | 详细说明 |
---|---|
Thread类 | class threading.Thread(group=None,target=None,name=None,args=(),kwargs=,*,daemon=None),类构造器,group表示线程组,目前不支持该功能,用于后续扩展。target是可以被run()方法调用的对象。name是线程名称。args是参数tuple,用于被target调用。kwargs是关键字参数dictionary,用于被target调用。daemon表明线程是否为daemon类别。在Thread子类重写构造器时,必须首先调用Thread.init() |
start() | 开始线程的生命周期,该方法会安排run()方法的调用,如果调用start()多次,会发生RuntimeError |
run() | 线程的生命周期-activity |
join(timeout=None) | 等待直到线程终止。如果timeout非零,其值为浮点数,要判断join()是否超时,需要使用is_alive()方法看,如果在调用了join()方法后再调用同一线程的is_alive()方法,如果还是alive的,说明join()超时了。如果timeout为None,则调用者会一直阻塞直到被调用join()方法的线程终止。一个线程的join()可被多次调用 |
name | |
getName() | |
setName() | |
ident | 线程idnetifier |
is_alive() | 判断线程是否alive,主要是看run()是否结束 |
daemon | daemon线程标志 |
isDaemon | 判断是否daemon线程 |
setDaemon | 设置为daemon线程 |
threading module的Lock类(对象)
Lock即primitive lock,原始锁,只会处于两个状态中的一个,“locked”(初始态)或“unlocked”。并有acquire()和release()两个方法。acquire()有阻塞和非阻塞两种方式,阻塞是会一直等待直到锁释放,而非阻塞是如果获取不到锁就立即返回false。相关图示如下:
threading module的RLock类(对象)
Reentrant Lock,可重入锁,即对于同一线程而言,是可重入锁,而对于其他线程而言,和上面的Lock没有区别。
和Lock的区别在于:在某一线程acquire()一个lock时,发现lock已经被本线程之前acquire()过但还未release(),此时可以继续acquire(),只是要进行同样次数的release()后才会将lock最终unlocked。
所以可重入锁就是基于Lock增加了lock的owner机制,lock的owner是自身,可以继续acquire(),并将计数器加一。
threading module的Condition类(对象)
Condition Variable-条件变量,和Lock总是有着关联,底层机制应当还是使用Lock来实现的,Condition Variable的一个很好的应用场景就是“生产者-消费者”模型:
# Consume an item
with cv:
cv.wait_for(an_item_is_available)
get_an_available_item()
# Produce one item
with cv:
make_an_item_available()
cv.notify()
类与方法 | 详细说明 |
---|---|
threading.Condition | class threading.Condition(lock=None),用于实现条件变量对象,允许多个线程wait一个条件变量,直到被一个线程notify。如果lock参数非None,必须是从外部传入的Lock或RLock。如果lock参数是None,会新建一个RLock |
acquire(*args) | acquire上面提到的传入的或新建的lock |
release() | release上面提到的传入的或新建的lock |
wait(timeout=None) | 等待notify或到timeout发生,其实就是相当于acquire()一个lock,然后等待有人将其release() |
wait_for(predicate, timeout=None) | 等待直到condition为True,predicate是可调用的且其结果是boolean值 |
notify(n=1) | 默认唤醒一个等待condition的线程。这个方法可以唤醒最多n个等待condition的线程 |
notify_all() | 唤醒所有等待condition的线程 |
condition机制如下图所示: | |
即Consumer等待条件满足,而Producer则触发条件满足,这样来做线程间的同步和通信。 |
threading module的Semaphore类(对象)
Semaphore内部维护一个计数器,每次acquire()计数器减一,每次release()计数器加一,计数器不能为负数。
threading module的Event类(对象)
线程通信最为简单的机制,一个线程抛出一个信号,另外线程等待这个信号。
类与方法 | 详细说明 |
---|---|
Event | class threading.Event。管理一个内部flag(True or False) |
is_set() | 判断内部flag是否True |
set() | 将内部flag设置为True |
clear() | 将内部flag设置为False |
wait(timeout=None) | 阻塞直到内部flag为True,或者timeout时间到 |
threading module的Timer类(对象)
Timer用于某个动作需要等待一定时间后才开始执行。Timer是Thread的子类,除了start()方法,Timer还有cancel()方法来停止timer。
代码示例:
def hello():
print("hello, world")
t = Timer(30.0, hello)
t.start() # after 30 seconds, "hello, world" will be printed
类与方法 | 详细说明 |
---|---|
Timer | class threading.Timer(interval, function, args=None, kwargs=None)。timer将在interval的时间后运行function函数,args是function函数的参数,kwargs是function函数的关键字参数 |
cancel() | 停止timer,取消timer后续函数的执行(仅在等待状态有用) |
threading module的Barrier类(对象)
Barrier类用于多个线程间互相等待的场景,每个线程通过调用wait()尝试传输barrier,并会block直到所有线程都传输了barrier。线程最终会同步release。
举例:
b = Barrier(2, timeout=5)
def server():
start_server()
b.wait()
while True:
connection = accept_connection()
process_server_connection(connection)
def client():
b.wait()
while True:
connection = make_connection()
process_client_connection(connection)
类和方法 | 详细说明 |
---|---|
Barrier | class threading.Barrier(parties, action=None, timeout=None)。parties是线程数目,action是在线程release时可调用的,timeout是没有任何线程调用wait()的超时时间 |
wait(timeout=None) | Pass the barrier,返回值是0到parties-1的值 |
reset() | 将Barrier恢复为默认状态-空 |
abort() | 将Barrier设置为broken状态 |
parties | 需要pass barrier的线程个数 |
n_waiting | 当前等待barrier的线程个数 |
broken | 布尔值,表明barrier是否broken |
Python多线程编程-queue模块
queue是同步的队列类,multi-producer, multi-consumer队列,主要用于threading编程中,多线程之间的安全通信。queue实现了所有需要的锁语义。且实现了3类queue:
queue类别 | 详细说明 |
---|---|
FIFO | 先进先出,一般queue |
LIFO | 后进先出,像stack |
Priority queue | 入口进行了分类,最低的入口最先出 |
queue module的类和异常
类或异常 | 详细说明 |
---|---|
Queue | class queue.Queue(maxsize=0),FIFO queue构造,maxsize是queue最大空间,如果queue满,入队就会block直到有出队发生 |
LifoQueue | class queue.LifoQueue(maxsize=0),LIFO queue构造,maxsize是queue最大空间,如果queue满,入队就会block直到有出队发生 |
PriorityQueue | class queue.PriorityQueue(maxsize=0),Priority queue构造,maxsize是queue最大空间,如果queue满,入队就会block直到有出队发生 |
Empty | exception queue.Empty,队空 |
Full | exception queue.Full,队满 |
queue module的Queue类(对象)
方法 | 详细描述 |
---|---|
Queue.qsize() | 返回queue中大约的元素个数 |
Queue.empty() | 判断queue是否空 |
Queue.full() | 判断queue是否满 |
Queue.put(item, block=True, timeout=None) | 入队 |
Queue.put_nowait(item) | 和put(item,False)一样 |
Queue.get(block=True, timeout=None) | 出队 |
Queue.get_nowait() | 和get(False)一样 |
另外还有两个方法,用于支持入队的任务是否被daemon消费者线程处理的跟踪: | |
Queue.task_done()和Queue.join()。 |
以上是关于Python的多线程的主要内容,如果未能解决你的问题,请参考以下文章