Python的多线程

Posted 苏导

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python的多线程相关的知识,希望对你有一定的参考价值。

简述

多线程设计在系统中是比较关键的部分,对于系统性能的提高以及一个较为复杂框架的构建都是很重要的。

进程与线程差别

这部分许多资料可供参考,比如一些动画介绍,还有CSAPP书中也详细讲到。在嵌入式系统中一般是没有进程和线程区分概念的,因为嵌入式系统就跑一个程序(一个main入口),通过RTOS管理其中的各个线程(一般称为task),其实总的就一个进程,可以独享嵌入式系统的Flash, RAM等资源,也不会有IPC等技术和概念。
在多用户操作系统,一般非RTOS就是吧,Linux, Windows, android, MacOS, ios,上面是会有许许多多的程序可以同时被运行,最后被终止的。一个程序其实就是一段二进制代码,放在这些计算机的存储器上,只有这个程序被加载并执行,此时就有针对这个程序的生命周期和进程概念了,因此其实进程指的是程序的一次执行,进程有其独立的内存、栈、地址空间等,进程间通信需要用IPC进行进程间通信。

Python中的线程

Python设计之初的考虑是在Python解释器主循环中同时只有一个线程在执行,即Python解释器可以运行多个线程,但是任意时刻只有一个线程在解释器中运行。而做到这一点是通过Python的Global Interpreter Lock-GIL来实现的:

Python解释器使用GIL控制线程执行
首先设置GIL,并切换到一个线程去执行
然后运行指定数量的字节码指令,或者线程主动让出控制
把线程设置为睡眠状态,并解锁GIL

Python多线程编程-threading模块

目前需要在Python代码中使用多线程设计,都是import threading moduel。目前Python的线程没有优先级区分,也没有线程组,线程不可销毁、停止、挂起、恢复或中断。

threading module的方法

方法名详细说明
threading.active_count()返回当前处于激活状态的Thread对象个数,返回的个数等于threading.enumerate()返回的list的长度
threading.current_thread()返回当前的Thread对象,相当于调用者的线程
threading.get_ident()返回当前线程的线程identifier,这是一个非零值
threading.enumerate()返回当前处于激活状态的Thread对象,以list形式返回,包含daemonic线程,dummy线程和main线程
threading.main_thread()返回main线程对象,一般是Python解释器开始运行的线程
threading.settrace(func)为所有从threading module创建的线程设置一个trace函数,该函数会在每个线程的run()方法被调用前,被传给sys.settrace()
threading.setprofile(func)为所有从threading module创建的线程设置一个profile函数,该函数会在每个线程的run()方法被调用前,被传给sys.setprofile()
threading.stack_size([size])当创建一个新的线程时返回该线程使用的stack size,size参数可选,用来指定所创建线程的stack size,必须是0或是一个至少为32768的正整数。size未指定会默认使用0值,RuntimeError表示改变stack size不支持,ValueError表示stack size非法值。32K的stack size是目前支持最小的值,足够的stack空间主要是解释器本身需要占用。一般memory是4096 Bytes一个page,Stack size建议设置为4K Bytes的倍数

threading module的常数

常数详细说明
threading.TIMEOUT_MAX阻塞函数(Lock.acquire(), RLock.acquire(), Condition.wait()等)的允许最大的超时值,如果使用的timeout参数大于这个最大值,就会发生OverflowError

threading module的Thread类(对象)

Thread类是可以单独控制和运行的线程,可以通过传递一个可调用对象给构造器,或者重写子类的run()方法来指定线程的运作。线程对象被创建,就必须通过start()方法来开始运作,此时会调用其run()方法,线程就激活了。直到run()方法终止,线程就停止。
其他线程可以调用本线程的join()方法,此时调用本线程join()方法的线程会阻塞,等待本线程执行结束退出,再继续调用线程的后续执行。
线程也可以标识为daemon线程,在Python程序退出时,daemon线程仍然保留,知道关机时才会退出。
还有一种main线程,即开始运行Python程序的线程,为非daemon线程。
还有dummy线程,这种线程是在threading module之外开启,比如通过调用的C代码创建的线程。

类及方法详细说明
Thread类class threading.Thread(group=None,target=None,name=None,args=(),kwargs=,*,daemon=None),类构造器,group表示线程组,目前不支持该功能,用于后续扩展。target是可以被run()方法调用的对象。name是线程名称。args是参数tuple,用于被target调用。kwargs是关键字参数dictionary,用于被target调用。daemon表明线程是否为daemon类别。在Thread子类重写构造器时,必须首先调用Thread.init()
start()开始线程的生命周期,该方法会安排run()方法的调用,如果调用start()多次,会发生RuntimeError
run()线程的生命周期-activity
join(timeout=None)等待直到线程终止。如果timeout非零,其值为浮点数,要判断join()是否超时,需要使用is_alive()方法看,如果在调用了join()方法后再调用同一线程的is_alive()方法,如果还是alive的,说明join()超时了。如果timeout为None,则调用者会一直阻塞直到被调用join()方法的线程终止。一个线程的join()可被多次调用
name
getName()
setName()
ident线程idnetifier
is_alive()判断线程是否alive,主要是看run()是否结束
daemondaemon线程标志
isDaemon判断是否daemon线程
setDaemon设置为daemon线程

threading module的Lock类(对象)

Lock即primitive lock,原始锁,只会处于两个状态中的一个,“locked”(初始态)或“unlocked”。并有acquire()和release()两个方法。acquire()有阻塞和非阻塞两种方式,阻塞是会一直等待直到锁释放,而非阻塞是如果获取不到锁就立即返回false。相关图示如下:

threading module的RLock类(对象)

Reentrant Lock,可重入锁,即对于同一线程而言,是可重入锁,而对于其他线程而言,和上面的Lock没有区别。
和Lock的区别在于:在某一线程acquire()一个lock时,发现lock已经被本线程之前acquire()过但还未release(),此时可以继续acquire(),只是要进行同样次数的release()后才会将lock最终unlocked。
所以可重入锁就是基于Lock增加了lock的owner机制,lock的owner是自身,可以继续acquire(),并将计数器加一。

threading module的Condition类(对象)

Condition Variable-条件变量,和Lock总是有着关联,底层机制应当还是使用Lock来实现的,Condition Variable的一个很好的应用场景就是“生产者-消费者”模型:

# Consume an item
with cv:
    cv.wait_for(an_item_is_available)
    get_an_available_item()
    
# Produce one item
with cv:
    make_an_item_available()
    cv.notify()
类与方法详细说明
threading.Conditionclass threading.Condition(lock=None),用于实现条件变量对象,允许多个线程wait一个条件变量,直到被一个线程notify。如果lock参数非None,必须是从外部传入的Lock或RLock。如果lock参数是None,会新建一个RLock
acquire(*args)acquire上面提到的传入的或新建的lock
release()release上面提到的传入的或新建的lock
wait(timeout=None)等待notify或到timeout发生,其实就是相当于acquire()一个lock,然后等待有人将其release()
wait_for(predicate, timeout=None)等待直到condition为True,predicate是可调用的且其结果是boolean值
notify(n=1)默认唤醒一个等待condition的线程。这个方法可以唤醒最多n个等待condition的线程
notify_all()唤醒所有等待condition的线程
condition机制如下图所示:
即Consumer等待条件满足,而Producer则触发条件满足,这样来做线程间的同步和通信。

threading module的Semaphore类(对象)

Semaphore内部维护一个计数器,每次acquire()计数器减一,每次release()计数器加一,计数器不能为负数。

threading module的Event类(对象)

线程通信最为简单的机制,一个线程抛出一个信号,另外线程等待这个信号。

类与方法详细说明
Eventclass threading.Event。管理一个内部flag(True or False)
is_set()判断内部flag是否True
set()将内部flag设置为True
clear()将内部flag设置为False
wait(timeout=None)阻塞直到内部flag为True,或者timeout时间到

threading module的Timer类(对象)

Timer用于某个动作需要等待一定时间后才开始执行。Timer是Thread的子类,除了start()方法,Timer还有cancel()方法来停止timer。
代码示例:

def hello():
    print("hello, world")

t = Timer(30.0, hello)
t.start() # after 30 seconds, "hello, world" will be printed
类与方法详细说明
Timerclass threading.Timer(interval, function, args=None, kwargs=None)。timer将在interval的时间后运行function函数,args是function函数的参数,kwargs是function函数的关键字参数
cancel()停止timer,取消timer后续函数的执行(仅在等待状态有用)

threading module的Barrier类(对象)

Barrier类用于多个线程间互相等待的场景,每个线程通过调用wait()尝试传输barrier,并会block直到所有线程都传输了barrier。线程最终会同步release。
举例:

b = Barrier(2, timeout=5)

def server():
    start_server()
    b.wait()
    while True:
        connection = accept_connection()
        process_server_connection(connection)

def client():
    b.wait()
    while True:
        connection = make_connection()
        process_client_connection(connection)
类和方法详细说明
Barrierclass threading.Barrier(parties, action=None, timeout=None)。parties是线程数目,action是在线程release时可调用的,timeout是没有任何线程调用wait()的超时时间
wait(timeout=None)Pass the barrier,返回值是0到parties-1的值
reset()将Barrier恢复为默认状态-空
abort()将Barrier设置为broken状态
parties需要pass barrier的线程个数
n_waiting当前等待barrier的线程个数
broken布尔值,表明barrier是否broken

Python多线程编程-queue模块

queue是同步的队列类,multi-producer, multi-consumer队列,主要用于threading编程中,多线程之间的安全通信。queue实现了所有需要的锁语义。且实现了3类queue:

queue类别详细说明
FIFO先进先出,一般queue
LIFO后进先出,像stack
Priority queue入口进行了分类,最低的入口最先出

queue module的类和异常

类或异常详细说明
Queueclass queue.Queue(maxsize=0),FIFO queue构造,maxsize是queue最大空间,如果queue满,入队就会block直到有出队发生
LifoQueueclass queue.LifoQueue(maxsize=0),LIFO queue构造,maxsize是queue最大空间,如果queue满,入队就会block直到有出队发生
PriorityQueueclass queue.PriorityQueue(maxsize=0),Priority queue构造,maxsize是queue最大空间,如果queue满,入队就会block直到有出队发生
Emptyexception queue.Empty,队空
Fullexception queue.Full,队满

queue module的Queue类(对象)

方法详细描述
Queue.qsize()返回queue中大约的元素个数
Queue.empty()判断queue是否空
Queue.full()判断queue是否满
Queue.put(item, block=True, timeout=None)入队
Queue.put_nowait(item)和put(item,False)一样
Queue.get(block=True, timeout=None)出队
Queue.get_nowait()和get(False)一样
另外还有两个方法,用于支持入队的任务是否被daemon消费者线程处理的跟踪:
Queue.task_done()和Queue.join()。

以上是关于Python的多线程的主要内容,如果未能解决你的问题,请参考以下文章

java的多线程:线程安全问题

python中的线程安全和非线程安全的区别

python 进程 线程 协程

python中的多线程

Python的多线程GIL浅谈

Python 中的多线程还是串行处理?