十并发编程
Posted lisenlin
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了十并发编程相关的知识,希望对你有一定的参考价值。
一、进程
1.什么是进程
#一个正在进行的过程,或者说是一个程序的运行过程 #其实进程是对正在运行的程序的一种抽象/概括的说法 #进程的概念起源操作系统,进程是操作系统最核心的概念之一 #操作系统其它所有的概念都是围绕进程展开的
2.操作系统的作用
#1.隐藏丑陋复杂的硬件接口,提供良好的抽象接口 #2.管理、调度进程,并且将多个进程对硬件的竞争变得有序
3.操作系统的多道技术
#1.产生背景:针对单核,实现并发 ps:现在的主机一般都是多核,那么每个核都会利用多道技术,有4个cpu, 运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度 到4个cpu中的任意一个,具体由操作系统调度算法决定 #2.优点 ##1.空间上的复用:如内存中同时有多道程序,互相隔离 ##2.时间上的复用:复用一个cpu的时间片 强调:遇到io切换,占用cpu时间过长也切,核心在于切之前将进程的 状态保存下来,这样才能保证下次切换回来时,能基于上次切 走的位置继续运行
4.并行与并发
#1.并发:是伪并发,即看起来是同时运行。单个cpu+多道技术就可以并发 #2.并行:真正意义上的同时运行,只有具备多个cpu才能实现并行
5.开启进程的俩种方式
##1.直接调用Process import time from multiprocessing import Process def task(name): print(‘%s is running‘ % name) time.sleep(3) print(‘%s is done‘ % name) if __name__ == "__main__": # Process(target=task,kwargs={‘name‘:‘任务1‘}) #kwargs为传递关键字参数 # Process在windows下必须在main下运行,否则会无限创建子进程, # 因为在windows下子进程会执行父进程所有语句,获得所有名称空间。传参args里必须加逗号,才能表示args是元组,否则报错 p=Process(target=task, args=(‘任务1‘,)) #args为传递位置参数 p.start() # 只是给操作系统发送开启子进程的信号 print(‘主‘) ##2.自定义类,继承Process from multiprocessing import Process import time class MyProcess(Process): #自定义进程类,继承Process def __init__(self, name): super().__init__() self.name = name def run(self): # 必写,p.start()调用的是p.run() print(‘%s is running‘ % self.name) time.sleep(3) print(‘%s is done‘ % self.name) if __name__ == "__main__": p = MyProcess(‘lisl‘) p.start() # p.start()=p.run(),如果直接使用p.run(),就不会创建子进程并发执行 print(‘主‘)
6.进程的pid与ppid(父类pid)
##通过查看pid来判断是否是开启了另一个进程 import time import os from multiprocessing import Process def task(): print(‘子进程%s is running‘%os.getpid()) print(‘父进程是%s‘%os.getppid()) time.sleep(3) if __name__=="__main__": p=Process(target=task,) p.start() print(‘主进程%s‘%os.getpid(),) print(‘主进程的父进程%s‘%os.getppid(),) #在Pycharm运行就是Pycharm;在cmd运行父进程就是cmd.exe
7.进程对象的join方法
##单个子进程 import time from multiprocessing import Process def task(x): print(‘%s is running‘%x) time.sleep(3) print(‘%s is done‘%x) if __name__=="__main__": p=Process(target=task,args=(‘lisl‘,)) p.start() p.join() #主进程等待子进程运行完在往下执行 print(‘主‘)
##多个子进程 import time, os from multiprocessing import Process def task(n): print(‘%s is running‘ % os.getpid()) time.sleep(n) print(‘%s is done‘ % os.getpid()) if __name__ == "__main__": p1 = Process(target=task, args=(1,)) p2 = Process(target=task, args=(2,)) p3 = Process(target=task, args=(3,)) start_time = time.time() p1.start() p2.start() p3.start() p1.join() # 等待1s p2.join() # 等待1s p3.join() # 等待1s stop_time = time.time() print(‘主‘, stop_time - start_time) ##多个子进程(升级版) from multiprocessing import Process import os import time def task(n): print(‘%s is running‘%os.getpid()) time.sleep(n) print (‘%s is done‘%os.getpid()) if __name__ == ‘__main__‘: p_l=[] start_time=time.time() for i in range(1,4): p=Process(target=task,args=(i,)) p_l.append(p) p.start() for p in p_l: p.join() end_time=time.time() print(‘主进程执行时间%s‘%(end_time-start_time))
8.进程对象的其他相关属性和方法
import time from multiprocessing import Process def task(x): print(‘%s is running‘%os.getpid) time.sleep(x) if __name__ == ‘__main__‘: p=Process(target=task,args=(2,),name=‘子进程1‘) p.start() print(p.name) #打印子进程1,默认是打印Process-1 print(p.pid) #进程号 print(p.is_alive()) #子进程是否存活 p.terminate() #让操作系统去干掉子进程,需要时间
9.进程之间内存空间隔离
from multiprocessing import Process x = 100 def task(): global x x = 0 if __name__ == ‘__main__‘: p = Process(target=task, name=‘子进程1‘) p.start() p.join() # 等待子进程执行完 print(‘主‘, x) # x=100,子进程不影响主进程变量x
10.僵尸进程与孤儿进程
#僵尸进程(有害) 一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。 总之,子进程结束而父进程还未结束 #孤儿进程(无害) 一个父进程退出,而它的一个或多个子进程还在运行,那么子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对他们完成状态收集工作 总之,父进程比子进程早结束
10.关于while True,input,与子进程的问题
from multiprocessing import Process import time def task(name): print(‘%s is running‘%name) time.sleep(5) print(‘%s is done‘%name) if __name__ == ‘__main__‘: while True: choice=input(‘>>:‘).strip() #卡在input时子进程执行完也无法将结果输出,需等待主进程过了input阶段,有充足的时间才可输出 if not choice: continue p=Process(target=task,args=(‘唐三‘,)) p.start() print(‘主‘) #主进程会等待子进程执行完才结束。如果主进程执行完,想要子进程也随之结束,可将子进程设置成守护进程。
11.守护进程
#当子进程执行的任务在父进程代码运行完毕后就没有存在的必要了,该子进程就应该被设置为守护进程 from multiprocessing import Process import time def task(name): print(‘%s is running‘%name) time.sleep(5) print (‘%s is done‘%name) if __name__ == ‘__main__‘: p=Process(target=task,args=(‘lisl‘,)) p.daemon = True #设置为守护进程,默认False p.start() print(‘主‘)
from multiprocessing import Process import time def task1(name): print(‘%s is running‘%name) time.sleep(5) print (‘%s is done‘%name) def task2(name): print(‘%s is running‘%name) time.sleep(5) print (‘%s is done‘%name) if __name__ == ‘__main__‘: p1=Process(target=task1,args=(‘lisl‘,)) p2=Process(target=task2,args=(‘唐三‘,)) p1.daemon = True #设置为守护进程,默认False p1.start() p2.start() print(‘主‘) #因为p2没有设置为守护进程,因此主进程执行完,还会等待p2子进程,而p1一定随着主进程结束而结束,无关p2子进程
12 互斥锁----牺牲了效率,保证了数据安全
#由以下现象引出互斥锁 #多人抢夺多一张票,却显示多人抢票成功,现实只能有一人获得票 from multiprocessing import Process import json import time import random import os def search(): ‘‘‘查票‘‘‘ time.sleep(random.randint(1,3)) dic=json.load(open(‘db.txt‘,‘r‘,encoding=‘utf-8‘)) print(‘%s 查看到剩余票数%s‘%(os.getpid(),dic[‘count‘])) def get(): ‘‘‘得票‘‘‘ dic=json.load(open(‘db.txt‘,‘r‘,encoding=‘utf-8‘)) if dic[‘count‘]>0: dic[‘count‘]-=1 time.sleep(random.randint(1,3)) json.dump(dic,open(‘db.txt‘,‘w‘,encoding=‘utf-8‘)) print(‘%s 购票成功‘%os.getpid()) else: print(‘%s 购票失败‘%os.getpid()) def task(): search() get() if __name__ == ‘__main__‘: for i in range(10): p=Process(target=task) p.start() #使用p.join()可以解决买错票问题,但是子程序串行执行,效率低。最佳解决办法是只把修改共享数据这一部分变成串行,而非整体,即加入互斥锁
#使用互斥锁解决一票多人获得情况 from multiprocessing import Process,Lock import json import time import random import os def search(): ‘‘‘查票‘‘‘ time.sleep(random.randint(1,3)) dic=json.load(open(‘db.txt‘,‘r‘,encoding=‘utf-8‘)) print(‘%s 查看到剩余票数%s‘%(os.getpid(),dic[‘count‘])) def get(): ‘‘‘得票‘‘‘ dic=json.load(open(‘db.txt‘,‘r‘,encoding=‘utf-8‘)) if dic[‘count‘]>0: dic[‘count‘]-=1 time.sleep(random.randint(1,3)) json.dump(dic,open(‘db.txt‘,‘w‘,encoding=‘utf-8‘)) print(‘%s 购票成功‘%os.getpid()) else: print(‘%s 购票失败‘%os.getpid()) def task(mutex): search() mutex.acquire() #获得一把锁,其他子进程需要等待锁释放,才可以访问get()函数 get() mutex.release() #释放一把锁 if __name__ == ‘__main__‘: mutex=Lock() #锁对象化,所有子进程共享一把锁,可锁住一份数据 for i in range(10): p=Process(target=task,args=(mutex,)) p.start()
from multiprocessing import Process,Lock import time import os def printer(mutex): mutex.acquire() #加锁 print(‘%s 打印1‘%os.getpid()) time.sleep(1) print(‘%s 打印2‘%os.getpid()) mutex.release() #释放锁 if __name__ == ‘__main__‘: mutex=Lock() p1=Process(target=printer,args=(mutex,)) p2=Process(target=printer,args=(mutex,)) p3=Process(target=printer,args=(mutex,)) p1.start() p2.start() p3.start()
13 实现进程之间的通信
#进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocess模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
#IPC全称:Inter-Process Communication,进程间通信
13.1 队列
from multiprocessing import Queue q= Queue(3) #允许队列里有3个元素 q.put(‘hello‘) q.put(1000) q.put({"count":3}) # q.put(‘fourth‘) #第4个会被阻塞住,只能有第一个元素hello被取走,才能进入队列,将参数block值更改为False,则不阻塞而报错 #q.put_nowait(‘fourth‘) #队列满就报错,等于q.put(‘fourth‘,block=False) print(q.get()) #取走第一个元素hello print(q.get()) print(q.get()) print(q.get()) #上面已取完所有元素,阻塞住,当有元素被存放队列就读取 # print(q.get(block=False)) #队列为空就报异常,等于q.get_nowait() # print(q.get(block=True,timeout=3)) #等待3秒还是为空报异常
13.2 生产者消费者模型
from multiprocessing import Process,Queue import time import random def prodecer(name,food,q): for i in range(1,4): res=‘%s%s个‘%(food,i) time.sleep(random.randint(1,3)) q.put(res) print(‘厨师[%s]生产了<%s>‘%(name,res)) def consumer(name,q): while True: res=q.get() if res is None:break #消费者收到None,结束 time.sleep(random.randint(1,3)) print(‘吃货[%s]吃了<%s>‘%(name,res)) if __name__ == ‘__main__‘: q=Queue() #队列 p1=Process(target=prodecer,args=(‘lisl‘,‘包子‘,q)) #生产者负责造 c1=Process(target=consumer,args=(‘唐三‘,q)) #消费者负责吃 p1.start() c1.start() p1.join() #生产者生产完后, q.put(None) #生产者生产完后,主动放入一个结束标志,当消费者受到None,即结束
#多个生产者多个消费者,如果保证生产者执行完毕后,消费者消费完要结束? #通过队列的JoinableQueue模块里的q.join()和q.task_done()与守护进程配合使用 from multiprocessing import Process,JoinableQueue import time import random def prodecer(name,food,q): for i in range(1,4): res=‘%s%s个‘%(food,i) time.sleep(random.randint(1,3)) q.put(res) print(‘厨师[%s]生产了<%s>‘%(name,res)) def consumer(name,q): while True: res=q.get() if res is None:break #消费者收到None,结束 time.sleep(random.randint(1,3)) print(‘吃货[%s]吃了<%s>‘%(name,res)) q.task_done() #每取走一个数据(q.get()),就向q.join()发送一个信号 if __name__ == ‘__main__‘: q=JoinableQueue() #队列 p1=Process(target=prodecer,args=(‘lisl‘,‘包子‘,q)) #生产者负责造 p2=Process(target=prodecer,args=(‘lxq‘,‘馒头‘,q)) #生产者负责造 c1=Process(target=consumer,args=(‘唐三‘,q)) #消费者负责吃 c2=Process(target=consumer,args=(‘小舞‘,q)) #消费者负责吃 c3=Process(target=consumer,args=(‘沐白‘,q)) #消费者负责吃 # 将消费者设置成守护进程,随主进程消失而消失,配合队列q.join()执行完毕,可保证消费者子进程已取完队列所有元素,可随主进程一起消失 c1.daemon=True c2.daemon=True c3.daemon=True p1.start() p2.start() c1.start() c2.start() c3.start() p1.join() #生产者生产完后, p2.join() #生产者生产完后, q.join() #等待队列无元素 print(‘主‘) #注:JoinableQueue具备Queue的基本功能
二、线程
1 什么是线程
线程是一条流水线的工作过程,一个进程内至少有一个线程
进程是一个资源单位,而进程内的线程才是执行单位
2 为什么要用线程(线程vs进程)
#1.同一进程下的多个线程共享该进程内的数据 #2.线程的创建开销要远远小于进程的
3.开启线程的俩种方式
##1.直接调用Thread import time from threading import Thread def task(name): print(‘%s is running‘%name) time.sleep(3) print(‘%s is done‘%name) if __name__=="__main__": #Thread(target=task,args={‘name‘:‘任务1‘}) # Thread在windows下必须在main下运行,否则会报错,因为在windows下子进程会执行父进程所有语句, # 获得所有名称空间。传参args里必须加逗号,才能表示args是元组,否则报错 t=Thread(target=task,args=(‘任务1‘,)) t.start() #只是给操作系统发送开启子线程的信号 print(‘主‘) ###2.自定义类,继承Thread from threading import Thread import time class MyThread(Thread): def __init__(self, name): super().__init__() self.name = name def run(self): # 必写 print(‘%s is running‘ % self.name) time.sleep(3) print(‘%s is done‘ % self.name) if __name__ == "__main__": t = MyThread(‘lisl‘) t.start() # p.start()=p.run() print(‘主‘)
4.线程的pid与ppid
同一进程下的多个线程属于同一个pid,线程之间不存在父线程p与儿子线程的说法,线程与线程之间是平等的,因此也不会存在所谓的僵尸线程
5.线程下的join方法
##单个子线程 import time from threading import Thread def task(x): print(‘%s is running‘%x) time.sleep(3) print(‘%s is done‘%x) if __name__=="__main__": t=Thread(target=task,args=(‘lisl‘,)) t.start() t.join() #主线程等待子线程运行完在往下执行 print(‘主‘)
##多个子进程 import time, os from threading import Thread def task(n): print(‘%s is running‘ % os.getpid()) time.sleep(n) print(‘%s is done‘ % os.getpid()) if __name__ == "__main__": t1 = Thread(target=task, args=(1,)) t2 = Thread(target=task, args=(2,)) t3 = Thread(target=task, args=(3,)) start_time = time.time() t1.start() t2.start() t3.start() t1.join() # 等待1s t2.join() # 等待1s t3.join() # 等待1s stop_time = time.time() print(‘主‘, stop_time - start_time) ##多个子进程(升级版) from threading import Thread import os import time def task(n): print(‘%s is running‘%os.getpid()) time.sleep(n) print (‘%s is done‘%os.getpid()) if __name__ == ‘__main__‘: t_l=[] start_time=time.time() for i in range(1,4): t=Thread(target=task,args=(i,)) t_l.append(t) t.start() for t in t_l: t.join() end_time=time.time() print(‘主进程执行时间%s‘%(end_time-start_time))
6.线程实例化对象的方法
import time import os from threading import Thread def task(x): print(‘%s is running‘%os.getpid()) time.sleep(x) if __name__ == ‘__main__‘: t=Thread(target=task,args=(2,),name=‘子线程1‘) t.start() print(t.name) #打印子线程1,默认是打印Thread-1,或者用t.getName() print(t.getName()) print(t.is_alive()) #子线程是否存活 t.setName(‘超级线程‘) #设置子线程名 print(t.getName())
print (t.getpid()) #获取线程的PID
#在线程里查看当前线程名 import time from threading import Thread,current_thread def task(x): print(‘当前线程名:%s is running‘%current_thread().getName()) #查看当前线程名 time.sleep(x) if __name__ == ‘__main__‘: t=Thread(target=task,args=(2,),name=‘子线程1‘) t.start() print(current_thread().getName()) #当前线程名为主线程MainThread
6.守护线程
from threading import Thread import time def talk(name): print(‘===>‘) time.sleep(2) print(‘%s say hello‘%name) if __name__ == ‘__main__‘: t=Thread(target=talk,args=(‘lisl‘,)) t.daemon=True #设置为守护线程,随着所有非守护线程的结束而结束,关下一下例子《易迷糊例子》 t.start() print(‘主线程‘) #主线程结束
from threading import Thread import time def talk1(name): print(‘talk1===>‘) time.sleep(1) print(‘%s say hello‘%name) def talk2(name): print(‘talk2===>‘) time.sleep(3) print(‘%s say hello‘%name) if __name__ == ‘__main__‘: t1=Thread(target=talk1,args=(‘lisl‘,)) t2=Thread(target=talk2,args=(‘唐三‘,)) t1.daemon=True #设置为守护线程 t1.start() t2.start() print(‘主线程‘) #主线程结束
守护线程与守护进程的区别
守护进程是随主进程结束而结束
守护线程是随所有非守护线程结束而结束
7.线程的互斥锁
from threading import Thread,Lock import time n=100 def change(): global n with mutex: #与mutex.acquire()和mutex.release()效果一样 temp=n time.sleep(0.1) n=temp-1 if __name__ == ‘__main__‘: start_time=time.time() t_l=[] mutex=Lock() #进程互斥锁,因为进程共享数据,所以无需以参数传入子线程 for i in range(100): t=Thread(target=change) t_l.append(t) t.start() for t in t_l: t.join() end_time=time.time() print(‘主‘,n) print(‘run time is %s‘%(end_time-start_time))
8.GIL--Global Interpreter Lock
GIL本质就是一把互斥锁,跟线程和进程的互斥锁一样,都是将并发运行变成串行,以此控制统一时间内共享数据只能被一个任务所修改,进而保证数据安全。 不同的是,GIL是解释器级的锁,而进程或线程互斥锁是程序级别的锁。 所有线程的任务,都需要将任务的代码当做参数传给解释器的代码去执行,即所有的线程要想运行自己的任务,首先需要解决的是能够访问到解释器的代码。 解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,这就导致了一个问题,对于同一个数据100,可能线程1执行x=100的同时,
而垃圾回收执行的是回收100的操作,解决这种问题只能加锁处理,这就是所谓解释器级的锁,即GIL,保证python解释器统同一时间只能执行一个任务的代码
8.1 计算密集型,多进程效率高
from multiprocessing import Process from threading import Thread import os,time def work(): res=0 for i in range(100000000): res*=i if __name__ == ‘__main__‘: l=[] start=time.time() for i in range(4): # p=Process(target=work) #耗时13s多 p=Thread(target=work) #耗时23s多 l.append(p) p.start() for p in l: p.join() stop=time.time() print(‘run time is %s‘ %(stop-start))
8.2 I/O密集型,多线程效率高
from multiprocessing import Process from threading import Thread import time def work(): time.sleep(2) if __name__ == ‘__main__‘: l=[] start=time.time() for i in range(400): # p=Process(target=work) #耗时23s多,大部分时间耗费在创建进程上 p=Thread(target=work) #耗时2s多 l.append(p) p.start() for p in l: p.join() stop=time.time() print(‘run time is %s‘ %(stop-start))
8.3 应用:
多线程用于IO密集型,如socket,爬虫,web
多进程用于计算密集型,如金融分析
9.死锁
#俩个锁分别调用,容易引发死锁,如下 #死锁现象与递归锁 from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print(‘%s 拿到A锁‘%self.name) mutexB.acquire() print(‘%s 拿到B锁‘%self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print(‘%s 拿到B锁‘ % self.name) time.sleep(0.1) mutexA.acquire() print(‘%s 拿到A锁‘ % self.name) mutexA.release() mutexB.release() if __name__ == ‘__main__‘: for i in range(10): t=MyThread() t.start()
#解决上面死锁办法: #使用RLock,即加一个锁计数加1,递归锁在加1,每释放锁就减1,当锁的技术为0,才可供下一个其他线程调用 from threading import Thread,RLock import time mutexA=mutexB=RLock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print(‘%s 拿到A锁‘%self.name) mutexB.acquire() print(‘%s 拿到B锁‘%self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print(‘%s 拿到B锁‘ % self.name) time.sleep(0.1) mutexA.acquire() print(‘%s 拿到A锁‘ % self.name) mutexA.release() mutexB.release() if __name__ == ‘__main__‘: for i in range(10): t=MyThread() t.start()
10.信号量
#信息量:理解为是钥匙,钥匙数量固定,使用钥匙的人结束交给没钥匙的人 from threading import Thread,Semaphore,current_thread import time,random sm=Semaphore(5) def task(): with sm: #等价于sm.acquire()与sm.release()用法 print(‘%s is comming‘%current_thread().getName()) time.sleep(random.randint(1,3)) if __name__ == ‘__main__‘: for i in range(20): t=Thread(target=task) t.start()
11 Event事件
#用于一个进程下多个线程,一个线程需要上一个线程工作到某个特定阶段才可开始执行 event.isSet():返回event的状态值; event.wait():如果 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度; event.clear():恢复event的状态值为False
from threading import Thread,Event,current_thread import time event=Event() #事件对象化 def check(): print(‘%s checking mysql‘%current_thread().getName()) time.sleep(3) event.set() #设置为True,event.wait()有阻塞状态进入非阻塞状态 def conn(): print(‘waitting to connect MySQL‘) event.wait() #默认阻塞状态,当event.set()后进入非阻塞状态 print(‘%s connect MySQL‘%current_thread().getName()) if __name__ == ‘__main__‘: t1=Thread(target=check) t2=Thread(target=conn) t3=Thread(target=conn) t4=Thread(target=conn) t1.start() t2.start() t3.start() t4.start()
#Event事件:用于一个进程下多个线程,一个线程需要上一个线程工作到某个特定阶段才可开始执行 from threading import Thread,Event,current_thread import time event=Event() def check(): print(‘%s checking MySQL‘%current_thread().getName()) time.sleep(5) event.set() def conn(): count=1 while not event.is_set(): if count >3: raise TimeoutError(‘超时‘) print(‘%s try to connect MySQL %s times‘%(current_thread().getName(),count)) event.wait(2) #2秒为超时时间,即等不到event.set(),直接进入非阻塞状态 count +=1 print(‘%s connect MySQL‘%current_thread().getName()) if __name__ == ‘__main__‘: t1=Thread(target=check) t2=Thread(target=conn) t3=Thread(target=conn) t4=Thread(target=conn) t1.start() t2.start() t3.start() t4.start()
12 定时器
#定时器,此方式无法做到定点定时执行,比如想12点执行,不可以 from threading import Timer def hello(name): print(‘%s say hello to you‘%name) t=Timer(3,hello,args=(‘lisl‘,)) #3秒后开始运行 t.start()
13线程queue
import queue q=queue.Queue(3) #队列先进先出 q.put(1) q.put(2) q.put(3) # q.put(4) #队列满,阻塞 # q.put_nowait(4) #队列满,直接报错 # q.put(4,block=False) #队列满,直接报错 # q.put(4,block=True,timeout=3) #队列满,3秒后超时报错 print(q.get()) print(q.get()) print(q.get()) # print(q.get_nowait()) # print(q.get(block=False)) # print(q.get(block=True,timeout=3)) ###################################################### q=queue.LifoQueue(3) #堆栈,后进先出 q.put(1) q.put(2) q.put(3) print (q.get()) #3 print (q.get()) #2 print (q.get()) #1 ###################################################### q=queue.PriorityQueue(3) #优先级队列 q.put((10,‘a‘)) #10为优先级,数字越小,优先级越高,‘a‘为数据,可为任意数据类型 q.put((2,‘b‘)) q.put((-5,‘c‘)) print(q.get()) #结果为(-5,‘b‘)
三、进程池与线程池
1.什么是池的概念?
池指的是一个容器,该容器用来存放进程或线程,存放的数目是一定的
2.为什么要用池?
是为了将并发的进程或线程数目控制在计算机可承受的范围内
3.进程池与线程池的选用规则?
进程池:当任务是计算密集型的情况下应该用进程来利用多核优势
线程池:当任务是IO密集型的情况下应该用线程减少开销
4.同步调用VS异步调用
所谓同步调用与异步调用指的是池提交任务的两种方式 #同步调用: 提交完任务后,就在原地等待任务执行完毕,拿到运行结果/返回值后再执行下一行代码,同步调用下任务的执行是串行执行 #异步调用: 提交完任务后,不会在原地等待任务执行完毕,直接执行下一行代码,任务执行结果可单独返回,或者配合回调函数进行下一步处理。异步调用下任务的执行是并发执行
5.进程池异步调用示例
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os import time import random def task(x): print(‘%s is running‘ %os.getpid()) time.sleep(random.randint(1,3)) return x**2 if __name__ == ‘__main__‘: #异步调用 p=ProcessPoolExecutor() #不指定参数默认池的大写等于cpu的核数,只允许4个进程同时运行,一个运行才可用同PID接着运行下一个任务 futrues=[] for i in range(10): futrue=p.submit(task,i) #提交4个进程,task为任务函数,i为位置参数 futrues.append(futrue) #将提交任务的对象放入列表 # 与老版本的pool.close()和 pool.join()连用效果一样 p.shutdown(wait=True) # 俩作用:1.shutdown为禁止新的任务进入进程池,2.wait=Ture等待进程池任务都执行完毕 。 for futrue in futrues: print(futrue.result()) #打印任务返回结果 print(‘主‘)
6.进程池同步调用示例
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os import time import random def task(x): print(‘%s is running‘ %os.getpid()) time.sleep(random.randint(1,3)) return x**2 if __name__ == ‘__main__‘: #同步调用 p=ProcessPoolExecutor() #不指定参数默认池的大写等于cpu的核数 for i in range(10): res=p.submit(task,i).result() #提交任务后,直接返回结果,串行效果,使用意义不大 print(res) print(‘主‘)
7.线程池异步调用示例
from concurrent.futures import ThreadPoolExecutor from threading import current_thread import os import time import requests def get(url): print(‘%s GET %s‘ %(current_thread().getName(),url)) time.sleep(2) response=requests.get(url) if response.status_code == 200: res=response.text return res def parse(res): print(‘%s 解析[url]结果是 %s‘ % (os.getpid(), len(res))) if __name__ == ‘__main__‘: t=ThreadPoolExecutor(10) urls=[ ‘https://www.baidu.com‘, ‘https://www.openstack.org‘, ‘https://www.taobao.com‘, ‘https://www.jd.com‘, ] obj_l=[] for url in urls: obj=t.submit(get,url) obj_l.append(obj) t.shutdown(wait=True) for obj in obj_l: res=obj.result() print(‘%s 解析[url]结果是 %s‘ %(os.getpid(),len(res))) print(‘主‘)
8.线程池同步调用示例
from concurrent.futures import ThreadPoolExecutor from threading import current_thread import os import time import random def task(x): print(‘%s is running‘ %current_thread().getName()) time.sleep(random.randint(1,3)) return x**2 if __name__ == ‘__main__‘: #同步调用 t=ThreadPoolExecutor() #不指定参数默认池的个数等于cpu的核数*5 for i in range(10): res=t.submit(task,i).result() #提交任务后,直接返回结果,串行效果,使用意义不大 print(res) print(‘主‘)
9.进程池配合回调函数
# 回调函数 from concurrent.futures import ProcessPoolExecutor import os import time import requests def get(url): print(‘%s GET %s‘ %(os.getpid(),url)) time.sleep(2) response=requests.get(url) if response.status_code == 200: res=response.text return res def parse(obj): res=obj.result() print(‘%s 解析[url]结果是 %s‘ % (os.getpid(), len(res))) if __name__ == ‘__main__‘: p=ProcessPoolExecutor(10) urls=[ ‘https://www.baidu.com‘, ‘https://www.python.org‘, ‘https://www.openstack.org‘, ‘https://www.taobao.com‘, ‘https://www.jd.com‘, ] for url in urls: # 回调函数会在任务运行完毕后自动触发,并且接收该任务对象作为parser函数的位置参数 p.submit(get,url).add_done_callback(parse) #进程池提交任务触发回调函数,运行回调函数的是主进程 print(‘主‘,os.getpid())
10.线程池配合回调函数
from concurrent.futures import ThreadPoolExecutor from threading import current_thread import time import random def task(x): print(‘%s is running‘ %current_thread().getName()) time.sleep(random.randint(1,3)) return x**2 def parse(obj): res=obj.result() print(‘%s 解析的结果为%s‘ %(current_thread().getName(),res)) if __name__ == ‘__main__‘: t=ThreadPoolExecutor(3) #线程池默认不写参数是cpu核数*5 for i in range(10): t.submit(task,i).add_done_callback(parse) #线程池提交任务触发回调函数,运行回调函数的是各个线程
以上是关于十并发编程的主要内容,如果未能解决你的问题,请参考以下文章
六万字Java高并发编程入门第十九篇:并发编程入门总览包教包会值得收藏