Python_并发编程(线程 进程 协程)
Posted liutianyuan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python_并发编程(线程 进程 协程)相关的知识,希望对你有一定的参考价值。
一、进程和线程
进程
进程就是一个程序在一个数据集上的一次动态执行过程。
进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;
数据集则是程序在执行过程中所需要使用的资源;
进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系 统感知进程存在的唯一标志。
进程的本质:本质上就是一段程序的运行过程(抽象的概念)
意义:为了能够同时运行多个任务的并发而不是一次只能干一件事情
线程
线程的出现是为了降低上下文切换的消耗,提高系统的并发性,并突破一个进程只能干一样事的缺陷, 使到进程内并发成为可能。
线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序 计数器、寄存器集合和堆栈共同组成。
线程的引入减小了程序并发执行时的开销,提高了操作系统的并发 性能。线程没有自己的系统资源。
线程的本质:提高切换的效率,提高系统的并发性,并突破一个进程只能干一件事的缺陷
意义:使得进程内并发成为可能
二者之间的关系
1、一个程序至少有一个进程,一个进程至少有一个线程(进程可以理解为线程的容器)
2、进程是最小的资源单位,线程是最小的执行单位
3、进程的作用是一个资源管理
4、进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序运行效率
5、一个线程可以创建和撤销另一个线程,同一个进程中的多个线程之间可以并发执行
并发&并行
并发: 是指系统具有处理多个任务(动作)的能力 可通过CPU的切换实现
并行: 是指系统具有 同时 处理多个任务(动作)的能力
二者之间的关系:并行是并发的一个子集,并行的一定是并发,并发的不一定是并行
Python的线程与threading模块
线程的调用方式
threading 模块建立在thread 模块之上。thread模块以低级、原始的方式来处理和控制线程,而threading 模块通过对thread进行二次封装,
提供了更方便的api来处理线程。
一、直接调用
import threading import time def add():#定义每个线程要运行的函数 sum=0 for i in range(10000000): sum+=i time.sleep(1) print("sum",sum) def sayhi(num): # 定义每个线程要运行的函数 print("running on number:%s" % num) time.sleep(3) if __name__ ==‘__main__‘: start=time.time() t1=threading.Thread(target=add)#生成一个线程实例 t2=threading.Thread(target=sayhi,args=(1,))#生成另一个线程实例,args是参数 t1.start() #启动线程 t2.start() print(t1.getName()) #获取线程名 print(t2.getName()) #获取线程名
二、继承式调用
import threading import time class Mythread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self):#定义每个线程要运行的函数 print("running on number:%s" % self.num) time.sleep(3) if __name__ == ‘__main__‘: t1 = Mythread(1) t2 = Mythread(2) t1.start() t2.start() print("ending......")
三、threading.Thread的实例方法
join&aemon方法
import threading from time import ctime,sleep import time def ListenMusic(name): print("Begin listening to %s. %s" % (name, ctime())) sleep(2) # sleep等同于IO操作 print("end listening %s" % ctime()) def RecordBlog(title): print("Begin recording the %s! %s" % (title, ctime())) sleep(5) print(‘end recording %s‘ % ctime()) l = [] t1 = threading.Thread(target=ListenMusic,args=("A",)) t2 = threading.Thread(target=RecordBlog,args=("B",)) l.append(t1) l.append(t2) if __name__ == ‘__main__‘: for i in l : # i.setDaemon(True) # 两个都只执行第一句然后结束 i.start() # i.join() #线程一个一个执行 print ("all over %s" %ctime())
join():在子线程完成运行之前,这个子线程的父线程将一直被阻塞。
setDaemon(True):
将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。
当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成
想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程
完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦
其他方法
# run(): 线程被cpu调度后自动执行线程对象的run方法 # start():启动线程活动。 # isAlive(): 返回线程是否活动的。 # getName(): 返回线程名。 # setName(): 设置线程名。 threading模块提供的一些方法: # threading.currentThread(): 返回当前的线程变量。 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
四、同步锁(Lock)
import time import threading def addNum(): global num #在每个线程中都获取这个全局变量 #num-=1 temp=num #print(‘--get num:‘,num ) time.sleep(0.001) num =temp-1 #对此公共变量进行-1操作 num = 100 #设定一个共享变量 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有线程执行完毕 t.join() print(‘final num:‘, num ) #执行结果不固定,不为0
多个线程都在同时操作同一个共享资源,所以造成了资源破坏,怎么办呢?(join会造成串行,失去所线程的意义)
我们可以通过同步锁来解决这种问题
import time import threading R = threading.Lock() #### def sub(): global num R.acquire() #同步锁开始 temp = num - 1 time.sleep(0.01) num = temp R.release() #同步锁结束 num = 100 #设定一个共享变量 thread_list = [] for i in range(100): t = threading.Thread(target=sub) t.start() thread_list.append(t) for t in thread_list: #等待所有线程执行完毕 t.join() print(‘final num:‘, num ) #运行结果为0
五、线程死锁和递归锁
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。下面是一个死锁的例子:
import threading,time class myThread(threading.Thread): def doA(self): lockA.acquire() print(self.name,"gotlockA",time.ctime()) time.sleep(3) lockB.acquire() print(self.name,"gotlockB",time.ctime()) lockB.release() lockA.release() def doB(self): lockB.acquire() print(self.name,"gotlockB",time.ctime()) time.sleep(2) lockA.acquire() print(self.name,"gotlockA",time.ctime()) lockA.release() lockB.release() def run(self): self.doA() self.doB() if __name__=="__main__": lockA=threading.Lock() lockB=threading.Lock() threads=[] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join()
解决办法:使用递归锁,将
lockA=threading.Lock() lockB=threading.Lock()<br>#--------------<br>lock=threading.RLock()
import threading import time class MyThread(threading.Thread): def actionA(self): r_lcok.acquire() #count=1 print(self.name,"gotA",time.ctime()) time.sleep(2) r_lcok.acquire() #count=2 print(self.name, "gotB", time.ctime()) time.sleep(1) r_lcok.release() #count=1 r_lcok.release() #count=0 def actionB(self): r_lcok.acquire() print(self.name, "gotB", time.ctime()) time.sleep(2) r_lcok.acquire() print(self.name, "gotA", time.ctime()) time.sleep(1) r_lcok.release() r_lcok.release() def run(self): self.actionA() self.actionB() if __name__ == ‘__main__‘: # A=threading.Lock() 同步锁会造成死锁 # B=threading.Lock() r_lcok=threading.RLock() #使用递归锁解决这个问题 L=[] for i in range(5): t=MyThread() t.start() L.append(t) for i in L: i.join() print("ending....")
多线程利器------队列
队列Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递
queue队列的方法:
创建一个“队列”对象 import Queue q = Queue.Queue(maxsize = 10) Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。 将一个值放入队列中 q.put(10) 调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为 1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。 将一个值从队列中取出 q.get() 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True, get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。 Python Queue模块有三种队列及构造函数: 1、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize) 2、LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize) 3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize) 此包中的常用方法(q = Queue.Queue()): q.qsize() 返回队列的大小 q.empty() 如果队列为空,返回True,反之False q.full() 如果队列满了,返回True,反之False q.full 与 maxsize 大小对应 q.get([block[, timeout]]) 获取队列,timeout等待时间 q.get_nowait() 相当q.get(False) 非阻塞 q.put(item) 写入队列,timeout等待时间 q.put_nowait(item) 相当q.put(item, False) q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号 q.join() 实际上意味着等到队列为空,再执行别的操作
其他模式:
import queue #先进后出 q=queue.LifoQueue() q.put(34) q.put(56) q.put(12) #优先级 # q=queue.PriorityQueue() # q.put([5,100]) # q.put([7,200]) # q.put([3,"hello"]) # q.put([4,{"name":"alex"}]) while 1: data=q.get() print(data)
生产者消费者模型
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而是交给前台,而客户去饭菜也不需要不找厨师,直接去前台领取即可,这也是一个结耦的过程。
import time,random import queue,threading q = queue.Queue() def Producer(name): count = 0 while count <10: #做十个包子 print("making........") time.sleep(5) #正在做包子 q.put(count) #做好包子放入笼屉(队列)里 print(‘Producer %s has produced %s baozi..‘ %(name, count)) count +=1 q.join() #等到队列为空,再执行别的操作 print("ok......") def Consumer(name): count = 0 while count <10: #吃包子 time.sleep(random.randrange(4)) #先等待包子 data = q.get() #拿包子 如果没有包子等待包子的到来 print("eating....") time.sleep(3) #吃包子ing.... q.task_done()#在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号 print(‘