(并发编程)进程池线程池--提交任务的2种方式协程--yield greenlet,gevent模块
Posted 3sss-ss-s
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(并发编程)进程池线程池--提交任务的2种方式协程--yield greenlet,gevent模块相关的知识,希望对你有一定的参考价值。
一:进程池与线程池(同步,异步+回调函数)
先造个池子,然后放任务
为什么要用“池”:池子使用来限制并发的任务数目,限制我们的计算机在一个自己可承受的范围内去并发地执行任务
池子内什么时候装进程:并发的任务属于计算密集型
池子内什么时候装线程:并发的任务属于IO密集型
先造个池子,然后放任务
为什么要用“池”:池子使用来限制并发的任务数目,限制我们的计算机在一个自己可承受的范围内去并发地执行任务
池子内什么时候装进程:并发的任务属于计算密集型
池子内什么时候装线程:并发的任务属于IO密集型
#提交任务的两种方式:
# 同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的
# 异步调用:提交完一个任务之后,不在原地等待,结果???,而是直接执行下一行代码,会导致任务是并发执行的
p=ProcessPoolExecutor(4)
obj=p.submit(函数名,参1,参2)
obj.add_done_callback(函数名2)
# 同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的
# 异步调用:提交完一个任务之后,不在原地等待,结果???,而是直接执行下一行代码,会导致任务是并发执行的
p=ProcessPoolExecutor(4)
obj=p.submit(函数名,参1,参2)
obj.add_done_callback(函数名2)
#后续回调是obj会将自身传给函数名2,所以函数名2必须有且仅有一个参数。(多进程,回调主进程干)(多线程回调,子线程们干除开主线程)
p.shutdown(wait=True)#(等同于p.close()(不允许向池中放新任务) + p.join())关闭进程池的入口,并且在原地等待进程池内所有任务运行完毕
obj.result()
p.shutdown(wait=True)#(等同于p.close()(不允许向池中放新任务) + p.join())关闭进程池的入口,并且在原地等待进程池内所有任务运行完毕
obj.result()
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time,random,os
import time,random,os
def task(name,n):
print(‘%s%s is running‘ %(name,os.getpid()))
time.sleep(random.randint(1,3))
return n**2
print(‘%s%s is running‘ %(name,os.getpid()))
time.sleep(random.randint(1,3))
return n**2
if __name__ == ‘__main__‘:
# print(os.cpu_count())
p=ProcessPoolExecutor(4)
#提交任务的两种方式:
# 同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的
# 异步调用:提交完一个任务之后,不在原地等待,结果???,而是直接执行下一行代码,会导致任务是并发执行的
# print(os.cpu_count())
p=ProcessPoolExecutor(4)
#提交任务的两种方式:
# 同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的
# 异步调用:提交完一个任务之后,不在原地等待,结果???,而是直接执行下一行代码,会导致任务是并发执行的
l=[]
for i in range(10):
# 同步提交
# res=p.submit(task,‘进程pid: ‘,i).result()
# print(res)
for i in range(10):
# 同步提交
# res=p.submit(task,‘进程pid: ‘,i).result()
# print(res)
# 异步提交
future=p.submit(task,‘进程pid: ‘,i)
l.append(future)
future=p.submit(task,‘进程pid: ‘,i)
l.append(future)
p.shutdown(wait=True) #关闭进程池的入口,并且在原地等待进程池内所有任务运行完毕
for future in l:
print(future.result())
print(‘主‘)
print(future.result())
print(‘主‘)
在单线程下(i/o密集型任务)实现多个任务间遇到IO就切换就可以降低单线程的IO时间,从而最大限度地提升单线程的效率
在单线程下(计算密集型任务)切反而降低效率。
2、实现并发的三种手段:
a单线程下的并发;由程序自己控制,相对速度快
b多线程下的并发;由操作系统控制,相对速度较慢
c多进程下的并发;由操作系统控制,相对速度慢
在单线程下(计算密集型任务)切反而降低效率。
2、实现并发的三种手段:
a单线程下的并发;由程序自己控制,相对速度快
b多线程下的并发;由操作系统控制,相对速度较慢
c多进程下的并发;由操作系统控制,相对速度慢
3、基于yield保存状态,实现两个任务直接来回切换,即并发的效果 (但yield不会遇到阻塞自动切程序)
PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
import time
def consumer():
‘‘‘任务1:接收数据,处理数据‘‘‘
while True:
x=yield
def consumer():
‘‘‘任务1:接收数据,处理数据‘‘‘
while True:
x=yield
def producer():
‘‘‘任务2:生产数据‘‘‘
g=consumer()
next(g)
for i in range(10000000):
g.send(i)
‘‘‘任务2:生产数据‘‘‘
g=consumer()
next(g)
for i in range(10000000):
g.send(i)
start=time.time()
producer() #1.0202116966247559
stop=time.time()
print(stop-start)
producer() #1.0202116966247559
stop=time.time()
print(stop-start)
# 纯计算的任务并发执行
import time
def task1():
res=1
for i in range(1000000):
res+=i
yield
time.sleep(10000) #yield不会自动跳过阻塞
print(‘task1‘)
import time
def task1():
res=1
for i in range(1000000):
res+=i
yield
time.sleep(10000) #yield不会自动跳过阻塞
print(‘task1‘)
def task2():
g=task1()
res=1
for i in range(1000000):
res*=i
next(g)
print(‘task2‘)
g=task1()
res=1
for i in range(1000000):
res*=i
next(g)
print(‘task2‘)
start=time.time()
task2()
stop=time.time()
print(stop-start)
task2()
stop=time.time()
print(stop-start)
4、单线程下实现遇到IO切换
1、用greenlet(封装yield,遇到IO不自动切)
from greenlet import greenlet
import time
1、用greenlet(封装yield,遇到IO不自动切)
from greenlet import greenlet
import time
def eat(name):
print(‘%s eat 1‘ %name)
time.sleep(30)
g2.switch(‘alex‘) #只在第一次切换时传值
print(‘%s eat 2‘ %name)
g2.switch()
def play(name):
print(‘%s play 1‘ %name)
g1.switch()
print(‘%s play 2‘ %name)
print(‘%s eat 1‘ %name)
time.sleep(30)
g2.switch(‘alex‘) #只在第一次切换时传值
print(‘%s eat 2‘ %name)
g2.switch()
def play(name):
print(‘%s play 1‘ %name)
g1.switch()
print(‘%s play 2‘ %name)
g1=greenlet(eat)
g2=greenlet(play)
g1.switch(‘egon‘)
g2=greenlet(play)
g1.switch(‘egon‘)
2、用gevent模块(封装greenlet,不处理的话,遇到自己的IO才主动切)
import gevent
import gevent
def eat(name):
print(‘%s eat 1‘ %name)
gevent.sleep(5) #换成time.sleep(5),不会自动切
print(‘%s eat 2‘ %name)
def play(name):
print(‘%s play 1‘ %name)
gevent.sleep(3)
print(‘%s play 2‘ %name)
print(‘%s eat 1‘ %name)
gevent.sleep(5) #换成time.sleep(5),不会自动切
print(‘%s eat 2‘ %name)
def play(name):
print(‘%s play 1‘ %name)
gevent.sleep(3)
print(‘%s play 2‘ %name)
g1=gevent.spawn(eat,‘egon‘)
g2=gevent.spawn(play,‘alex‘)
g2=gevent.spawn(play,‘alex‘)
# gevent.sleep(100)
# g1.join()
# g2.join()
gevent.joinall([g1,g2])
# g1.join()
# g2.join()
gevent.joinall([g1,g2])
5、用gevent模块(封装greenlet,处理的话,遇到其他IO也主动切)
from gevent import monkey;monkey.patch_all()
from threading import current_thread
from gevent import spawn,joinall #pip3 install gevent
import time
from gevent import monkey;monkey.patch_all()
from threading import current_thread
from gevent import spawn,joinall #pip3 install gevent
import time
def play(name):
print(‘%s play 1‘ %name)
time.sleep(5)
print(‘%s play 2‘ %name)
print(‘%s play 1‘ %name)
time.sleep(5)
print(‘%s play 2‘ %name)
def eat(name):
print(‘%s eat 1‘ %name)
time.sleep(3)
print(‘%s eat 2‘ %name)
print(‘%s eat 1‘ %name)
time.sleep(3)
print(‘%s eat 2‘ %name)
g1=spawn(play,‘刘清正‘)
g2=spawn(eat,‘刘清正‘)
# g1.join()
# g2.join()
joinall([g1,g2])
# g2.join()
joinall([g1,g2])
以上是关于(并发编程)进程池线程池--提交任务的2种方式协程--yield greenlet,gevent模块的主要内容,如果未能解决你的问题,请参考以下文章
python全栈脱产第37天------进程池与线程池协程gevent模块单线程下实现并发的套接字通信