多线程编程
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程编程相关的知识,希望对你有一定的参考价值。
多线程编程
多线程编程对于具有如下特点的编程任务而言是非常理想的:本质上是异步;需要所个并发活动;每个活动处理顺序可能是不确定的,或者说是随机的、不可预测的。这种编程任务可以被组织或划分成多个执行流,其中每个执行流都有一个指定要完成的任务。根据应用的不同,这些子任务可能需要计算出中间结果,然后合并为最终的输出结果。
本节目录
1、线程与进程
2、线程与Python
3、thread模块
4、单线程与多线程执行对比
5、多线程实践
6、生产者消费者模型
7、线程的替代方案
1、线程与进程
程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。
在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行。这是这样的设计,大大提高了CPU的利用率。进程的出现让每个用户感觉到自己独享CPU,因此,进程就是为了在CPU上实现多道编程而提出的。
进程:计算机程序只是存储在磁盘上的可执行二进制(或其他类型)文件。只有把它加载到内存种1并被操作系统调用,才有其生命周期。进程(有时称为重量级进程)则是一个执行中的程序。
为什么有了进程还要线程
进程有很多优点,它提供了多道编程,让我们感觉我们每个人都拥有自己的CPU和其他资源,可以提高计算机的利用率。很多人就不理解了,既然进程这么优秀,为什么还要线程呢?其实,仔细观察就会发现进程还是有很多缺陷的,主要体现在两点上:
-
进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。
-
进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。
线程是什么?
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务
简单来说,线程是操作系统最小的调度单位,是一串指令的集合
一个进程中的各个线程与主线程共享一片数据空间,因此相比于独立的进程而言,线程间的信息共享和通信更加容易,线程一般以并发的方式执行的,正是由于这种并行和数据共享机制,使得多任务间的协作称为可能。当然,在单核的CPU系统中,因为真正的并发是不可能的,所以线程的执行是这样规划的:每个线程执行一小会,然后让步给其他线程(再次排队等待更多的CPU时间)。在整个进程的执行过程中,每个线程执行它自己特定的任务,在必要时和其他线程进行结果通信。
当然,这种共享并不是没有风险的,如果两个或多个线程访问同一片数据,由于1数据访问顺序不同,可能导致结果不一致,这种情况通常称为静态条件(race condition)。幸运的是大多数线程库都有一些同步原语,以允许线程管理器控制执行和访问。
进程与线程的区别:
-
进程要操作CPU必须先创建一个线程,进程本身并不可以执行,进程操作CPU是通过线程来实现的
-
线程共享内存,进程的内存是相互独立的。同一个进程之间的线程之间可以直接交流,两个进程想要通信必须通过一个中间代理来实现
- 同一个进程之间的线程可以直接交流,两个进程想要通信必须通过一个中间代理来实现。
- 创建一个新线程很简单,创建一个新进程需要对其父进程进行一次克隆。
- 一个线程可以控制与操作同一进程里的其他线程,但是进程只能操作子进程。
- 对于主线程的修改有可能会影响其他线程的运行,但是对于父进程的修改不会影响子进程。
2.线程与Python
线程的使用场景
#Python的所线程实际上是假多线程,因为Python的GIL(全局解释器锁)的存在,Python是不能过实现真正的多线程并发的,我们所看到的多线程并行的效果只是因为CPU在不断的进行上下切换,实际上还是串行,并不会同一时间多个线程分布在多个CPU上运行 #IO操作不占用CPU #计算占用CPU #故Python多线程不适合CPU密集型的任务,适合IO密集型的任务
全局解释器锁
Python代码的执行是由Python虚拟机(又名解释器主循环)进行控制的。Python在设计时是这样考虑的,在主循环中同时只能有一个控制线程在执行,就像单核CPU系统中的多进程一样。内存中可以有许多程序,但是在任意给定时刻只能有一个程序在运行。同理,尽管Python解释器中可以运行多个线程,但是在任意给定时刻只有一个线程会被解释器执行
对于Python虚拟机的访问是由全局解释器锁(GIL)控制。这个锁就是用来保证同时只能有一个线程运行的。在多线程环境中,Python虚拟机将按下面所述的方式执行。
(1)设置GIL.
(2)切换进一个线程去运行。
(3)执行下面操作之一
a.指定数量的字节码指令
b.线程主动让出控制权(可以调用time.sleep(0)来完成)。
(4)把线程设置回睡眠状态(切换出线程)。
(5)解锁GIL
(6)重复上述步骤
线程的两种调用方式:
#直接调用 import time import threading def run(n): #定义每个线程要运行的函数 print(‘task‘,n) time.sleep(1) # r1=run(‘t1‘) # r2=run(‘t2‘) t1=threading.Thread(target=run,args=(‘t1‘,))#生成一个线程实例 t2=threading.Thread(target=run,args=(‘t2‘,)) t1.start() #启动线程 t2.start() #继承调用 class MyThread(threading.Thread): def __init__(self,name): super(MyThread,self).__init__() self.name=name def run(self): #定义每个线程要运行的函数 print("Hello %s"%self.name) time.sleep(1) h1=MyThread(‘h1‘) h2=MyThread(‘h2‘) h1.start() h2.start()
import time import threading def run(n): print(‘task‘,n) time.sleep(1) #主线程和它启动的子线程是并行的 start_time=time.time() for i in range(50): t=threading.Thread(target=run,args=(‘t%s‘%i,)) #子线程 而整个程序下来本身是一个主线程 t.start() print(‘cost:‘,time.time()-start_time)# 无法测出整个程序运行的时间 因为主线程和它启动的子线程是并行的 主线程启动后不会等它的子线程执行完毕再往下走 #想要测出整个程序的运行时间 start_time=time.time() t_objs=[]#存线程实例 for i in range(50): t=threading.Thread(target=run,args=(‘t%s‘%i,)) #子线程 而整个程序下来本身是一个主线程 t.start() t_objs.append(t) for i in t_objs: #循环线程实例列表,等待所有线程执行完毕 i.join()#t1等待t1线程执行完后才往下走 #t1.join(5)#等待子线程执行5s后再执行主线程,主线程结束后继续执行子线程 print(‘all the threads has finish!‘,threading.current_thread()) print(‘cost:‘,time.time()-start_time)
join与Daemon(守护线程)
- join: 等待该线程终止 。当在主线程当中执行到t1.join()方法时,就认为主线程应该把执行权让给t1,直到t1执行结束,主线程才能在执行。
2.Daemon: threading模块支持守护线程,其工作方式是:守护线程一般是一个等待客户端请求服务的服务器。如果没有客户端请求,守护线程就是空的。如果把一个线程设置为守护线程,就表示这个线程不重要,进程退出时不需要等待这个线程之执行完成。
如果主线程准备退出时,不需要等待某些子线程完成,就可以为这些子线程设置为守护线程标记。该标记为真时,表示该线程不重要,或者说该线程只是用来等待客户端请求而不做任何其他事器情。
import threading def run(n): print(‘task‘,n) time.sleep(1) t1=threading.Thread(target=run,args=(‘t1‘,)) t2=threading.Thread(target=run,args=(‘t2‘,)) t1.start() #t1.join() #t1等待t1线程执行完后才往下走 #t1.join(5)#等待子线程执行5s后再执行主线程,主线程结束后继续执行子线程 t2.start() print(‘done‘) join
import time import threading def run(n): print(‘task‘,n) time.sleep(1) #主线程和它启动的子线程是并行的 start_time=time.time() for i in range(50): t=threading.Thread(target=run,args=(‘t%s‘%i,)) #子线程 而整个程序下来本身是一个主线程 t.setDaemon(True)#将当前线程设置为Daemon线程 在线程启动前设置 t.start() print(‘all the threads has finish!‘,threading.current_thread())
event事件
# python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法wait、clear、set #使用threading.Event可以使一个线程等待其他线程的通知,我们把这个Event传递到线程对象中,Event默认内置了一个标志,初始值为False。一旦该线程通过wait()方法进入等待状态,直到另一个线程调用该Event的set()方法将内置标志设置为True时,该Event会通知所有等待状态的线程恢复运行。 #event.set() 设置标志 #event.clear() 清除标志 #event.wait() 如果标志被设置, wait方法不做任何事情。如果标志被清除,等待将阻塞,直到它再次被设置。任何数量的线程都可以等待相同的事件 #event.isset()判断标志位是否被设定
import threading import time event=threading.Event()#创建一个事件 def lighter(): count=0 event.set()#设置标志位 while True: if count>=5 and count<10:#改成红灯 event.clear()#把标志位清空 print("\\033[41;1m red light is on...\\033[0m") elif count>10: #变绿灯 event.set() count=0 else: print("\\033[42;1mgreen light is on...\\033[0m") count+=1 time.sleep(1) def car(name): while True: if event.is_set(): print(‘%s is running...‘ %name) event.wait() else: print(‘%s is waiting...‘%name) event.wait() time.sleep(1) light=threading.Thread(target=lighter,) car1=threading.Thread(target=car,args=(‘car1‘,)) light.start() car1.start()
线程锁(互斥锁Mutex)
即使GIL阻止多个Python线程同时运行,对于实现整体的线程安全性仍然不够,如当可能有多个线程试图同时去修改同一份数据时可能会出现脏数据,这时候线程一般通过锁持有该对象的唯一访问权,同一时刻允许一个线程执行操作,否则,将可能出现更新任务执行到一半时被切换走的情况。
import time import threading def addNum(): global num # 在每个线程中都获取这个全局变量 print(‘--get num:‘, num) time.sleep(1) num += 1 # 对此公共变量进行-1操作 num =0 # 设定一个共享变量 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: # 等待所有线程执行完毕 t.join() print(‘final num:‘, num) #按正常来说,输出应该是100,但事实上会出现少于100的可能(一般出现在Python2,Python3并没有出错,有可能是Python3自动加了锁),假设你有A,B两个线程,此时都 要对num 进行加1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=0这个初始变量交给cpu去运算,当A线程去处完的结果是1,但此时B线程运算完的结果也是1,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是1。那应该怎样避免这种情况呢?其实很简单,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。
import time import threading def addNum(): global num #在每个线程中都获取这个全局变量 print(‘--get num:‘,num ) time.sleep(1) lock.acquire() #修改数据前加锁 num +=1 #对此公共变量进行-1操作 lock.release() #修改后释放 num = 0 #设定一个共享变量 thread_list = [] lock = threading.Lock() #生成全局锁 for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有线程执行完毕 t.join() print(‘final num:‘, num )
递归锁(Rlock)#解决死锁问题
Mutex可以分为递归锁(recursive mutex)和非递归锁(non-recursive mutex)。二者唯一的区别是,同一个线程可以多次获取同一个递归锁,不会产生死锁。而如果一个线程多次获取同一个非递归锁,则会产生死锁。
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去
import threading,time def run1(): print("grab the first part data") lock.acquire() global num num +=1 lock.release() return num def run2(): print("grab the second part data") lock.acquire() global num2 num2+=1 lock.release() return num2 def run3(): lock.acquire() res = run1() print(‘--------between run1 and run2-----‘) res2 = run2() lock.release() print(res,res2) if __name__ == ‘__main__‘: num,num2 = 0,0 lock = threading.RLock() for i in range(10): t = threading.Thread(target=run3) t.start() while threading.active_count() != 1: print(threading.active_count()) else: print(‘----all threads done---‘) print(num,num2)
Semaphore(信号量)
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据
import threading,time def run(n): semaphore.acquire() time.sleep(1) print("run the thread: %s\\n" %n) semaphore.release() if __name__ == ‘__main__‘: num= 0 semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行 for i in range(20): t = threading.Thread(target=run,args=(i,)) t.start() while threading.active_count() != 1: pass #print threading.active_count() else: print(‘----all threads done---‘) print(num)
定时器(Timer)
这个类表示一个动作,应该只有在经过一定时间后才会运行。定时器被启动,如线程,通过调用它们的start()方法。可以通过调用thecancel()方法停止计时器(在动作开始之前)。定时器在执行其操作之前将等待的间隔可能与用户指定的间隔不完全相同。
import threading def hello(): print("hello, world") t =threading.Timer(5.0, hello) t.start() # after 5.0 seconds, "hello, world" will be printed
queue队列
队列在线程编程中尤其有用,因为信息必须在多个线程之间安全地交换。
队列——线程安全,先进先出
属性 | 描述 |
Queue(maxsize=0) | 创建一个先进先出对列,如果给定最大值,则在对列没有空间时阻塞;否则(没有给定最大值),为无限队列 |
LifoQueue(maxsize=0) | 创建一个后进先出队列,如果给定最大值,则在对列没有空间时阻塞;否则(没有给定最大值),为无限队列 |
PriorityQueue(maxsize=0) | 创建一个优先级队列,如果给定最大值,则在对列没有空间时阻塞;否则(没有给定最大值),为无限队列 |
queue异常 | |
Empty | 当对空队列调用get*()方法时抛出异常 |
Full | 当对已满队列调用put*()方法是抛出异常 |
queue对象方法 | |
qsize() | 返回队列大小(由于返回时队列大小可能被其他线程修改,所以改值为近似值) |
empty() |
如果队列为空,则返回Ture,否则,返回False
|
full() | 如果队列已满,则返回True,否则,返回False |
put(item,block=Ture,timeout=None) | 将item放入队列。如果block为Ture(默认)且timeout为None,则在有可用空间之前阻塞;如果timeout为正值,则最多阻塞timeout秒,如果block为False,则抛出Empty异常 |
put_nowait(item) | 和put(item,False)相同 |
get(block=True,timeout=None) |
从对列中取得元素,如果给定了block(非0),则一直阻塞到有可用的元素为止 |
get_nowait() | 和get(False)相同 |
task_done() | 用于表示队列中的某个元素已执行完成,该方法会被下面的join()使用 |
join() | 在队列中所有元素之执行完毕并调用上面的task_done()信号之前,保持阻塞 |
详解:
- class
queue.
Queue
(maxsize=0) #先入先出
- class
queue.
LifoQueue
(maxsize=0) #后进先出 - class
queue.
PriorityQueue
(maxsize=0) #存储数据时可设置优先级的队列
优先级队列的构造函数。 maxsize是一个整数,它设置了可以放入队列中的项目数量的上限。一旦达到此大小,插入就会阻塞,直到队列项被消耗。如果maxsize小于或等于零,则队列大小是无限的。首先检索最低值的条目(最低值条目是由sorted(list(entries))[0]返回的条目。条目的典型模式是形式为(priority_number,data)的元组。
exception queue.Empty 当对空的队列对象调用非阻塞get()(或get_nowait())时引发的异常。
exception queue.Full 当非阻塞put()(或put_nowait())在一个已满的Queue对象上被调用时抛出异常。
Queue.qsize()
Queue.empty() #return True if empty
Queue.full()# return True if full
Queue.put(item,block = True,timeout = None)将项目放入队列。如果可选的args块为true并且超时为None(默认值),则在必要时阻塞,直到空闲插槽可用。如果timeout是一个正数,它会阻塞最多超时秒数,并且如果在该时间内没有可用空闲时间,则会引发完全异常。否则(块为假),如果空闲时隙立即可用,则将一个项目放在队列上,否则提高完全异常(在这种情况下忽略超时)。
Queue.put_nowait(item)相当于put(item,False)。
Queue.get(block = True,timeout = None)从队列中删除并返回一个项目。如果可选的args块为true并且超时为None(默认值),则在必要时阻止,直到项目可用。如果timeout是一个正数,它会阻塞最多超时秒数,并且如果在该时间内没有可用的项目,则引发Empty异常。否则(block is false),返回一个项,如果一个立即可用,否则提出空异常(在这种情况下超时被忽略)。
Queue.get_nowait()等同于get(False)。提供了两种方法来支持跟踪入队任务是否已由守护程序消费者线程完全处理。
Queue.task_done()指示先前已入队的任务已完成。由队列消费者线程使用。对于用于获取任务的每个get(),对task_done()的后续调用会告诉队列任务上的处理已完成。如果join()当前正在阻塞,它将在所有项目都被处理时恢复(意味着对于已经被put()到队列中的每个项目都收到了一个task_done()调用)。如果调用的次数比在队列中放置的项目多,则引发ValueError。
Queue.
join
() #阻塞(block)直到queue被消费完毕
from threading import Thread from queue import Queue class Producer(Thread): def run(self): Thread.run(self) class Comsumer(Thread): def run(self): Thread.run(self) que = Queue(maxsize=100) for i in range(100): que.put(i) print(‘empty:‘,que.empty()) print(que.qsize()) print(‘get:‘,que.get()) print(‘get:‘,que.get()) print(‘empty:‘,que.empty())
生产者消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
from threading import Thread from queue import Queue import time class Producer(Thread): def __init__(self,name,queue): ‘‘‘ :param name:生产者的名称 :param queue: 容器 ‘‘‘ self.__Name=name self.__Queue=queue super(Producer,self).__init__()#继承,执行一下父类的构造函数 def run(self): while True: if self.__Queue.full(): time.sleep(1) else: self.__Queue.put(‘baozi‘) time.sleep(1) print(‘%s 生产了一个包子‘ %(self.__Name,)) #hread.run(self) class Comsumer(Thread): def __init__(self,name,queue): ‘‘‘ :param name:生产者的名称 :param queue: 容器 ‘‘‘ self.__Name=name self.__Queue=queue super(Comsumer,self).__init__()#继承,执行一下父类的构造函数 def run(self): while True: if self.__Queue.empty(): time.sleep(1) else: self.__Queue.get(‘baozi‘) time.sleep(1) print(‘%s 消费了一个包子‘ %(self.__Name,)) que=Queue(maxsize=100) baogou1=Producer(‘cooker1‘,que) baogou1.start() baogou2=Producer(‘cooker2‘,que) baogou2.start() baogou3=Producer(‘cooker3‘,que) baogou3.start() for item in range(20): name = ‘customer%d‘ %(item,) temp = Comsumer(name,que) temp.start()
以上是关于多线程编程的主要内容,如果未能解决你的问题,请参考以下文章