一文搞明白Python协程编程:asyncio库
Posted 思源湖的鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一文搞明白Python协程编程:asyncio库相关的知识,希望对你有一定的参考价值。
目录
前言
本文试图搞明白Python协程编程
通常我们认为线程是轻量级的进程,因此我们也把协程理解为轻量级的线程即微线程
这是几个姊妹篇:
一、基础知识
1、并行和并发
在学习的时候,发现并行和并发在好些地方搞混了,这是两个概念,得先明确下
(1)定义
Erlang 之父 Joe Armstrong 画了一张很可爱的图来解释这两个概念:
- 并发是两个队列交替使用一台咖啡机
- 并行是两个队列同时使用两台咖啡机
两个词很好的说明了并发和并行的区别:
- Parallel Computing:并行计算
- Concurrent programming:并发编程
(2)联系
那么并发并行和多进程多线程的关系呢?
- 多核cpu,多个进程可以并行在多个cpu中计算,当然也会存在进程切换;单核cpu,多个进程在这个单核cpu中是并发运行,根据时间片读取上下文+执行程序+保存上下文。同一个进程同一时间段只能在一个cpu中运行,如果进程数小于cpu数,那么未使用的cpu将会空闲
- 多核cpu,进程中的多线程并行执行;单核cpu,多线程在单核cpu中并发执行,根据时间片切换线程。同一个线程同一时间段只能在一个cpu内核中运行,如果线程数小于cpu内核数,那么将有多余的内核空闲
场景:
- 多核CPU——计算密集型任务:尽量使用并行计算,可以提高任务执行效率。计算密集型任务会持续地将CPU占满,此时有越多CPU来分担任务,计算速度就会越快,这是并行的用武之地
- 单核CPU——计算密集型任务:此时的任务已经把CPU资源100%消耗了,就没必要使用并行计算,毕竟硬件障碍摆在那里
- 单核CPU——I/O密集型任务:I/O密集型任务在任务执行时需要经常调用磁盘、屏幕、键盘等外设,由于调用外设时CPU会空闲,所以CPU的利用率并不高,此时使用多线程程序,只是便于人机交互。计算效率提升不大。
- 多核CPU——I/O密集型任务:同单核CPU——I/O密集型任务
总结下:
- 并行从代码层次上强依赖于多进程/多线程代码,从硬件角度上则依赖于多核CPU
- 并发是一种现象:同时运行多个程序或多个任务需要被处理的现象,这些任务可能是并行执行的,也可能是串行执行的,和CPU核心数无关,是操作系统进程调度和CPU上下文切换达到的结果
2、进程、线程和协程
(1)定义
1、进程
- 进程是程序的一次执行过程,是一个动态概念,是程序在执行过程中分配和管理资源的基本单位
- 在面向线程设计的系统(如当代多数操作系统、Linux 2.6及更新的版本)中,进程本身不是基本运行单位,而是线程的容器
- 进程拥有自己独立的内存空间,所属线程可以访问进程的空间
- 程序本身只是指令、数据及其组织形式的描述,进程才是程序的真正运行实例
2、线程
- 线程是CPU调度和分派的基本单位,它可与同属一个进程的其他的线程共享进程所拥有的全部资源
- 当前的操作系统是面向线程的,即以线程为基本运行单位,并按线程分配CPU
3、协程
-
又称微线程,纤程,英文名Coroutine。协程的作用是在执行函数A时可以随时中断去执行函数B,然后中断函数B继续执行函数A(可以自由切换)。但这一过程并不是函数调用,是线程里的并发
-
拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方(非CPU),在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,CPU感觉不到协程的存在,协程是用户自己控制的
-
优点:
无需线程上下文切换的开销
无需数据操作锁定及同步的开销
方便切换控制流,简化编程模型
高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理 -
缺点:
无法利用多核资源:协程的本质是个单线程,它不能同时将单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上,协程如果要使用多核CPU的话,那么就需要先启多个进程,在每个进程下启一个线程,然后在线程下在启协程。
日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用
(2)联系
线程是进程的一部分,一个线程只能属于一个进程,而一个进程可以有多个线程,且至少有一个线程。而协程则包含在线程中
可以看个图
区别:理解它们的差别,从资源使用的角度出发。(所谓的资源就是计算机里的中央处理器,内存,文件,网络等等)
-
根本区别:进程是操作系统资源分配的基本单位,而线程是任务调度和执行的基本单位
-
在开销方面:每个进程都有独立的代码和数据空间(程序上下文),程序之间的切换会有较大的开销;线程共享代码和数据空间,每个线程都有自己独立的运行栈和程序计数器(PC),线程之间切换的开销小
-
所处环境:在操作系统中能同时运行多个进程(程序);而在同一个进程(程序)中有多个线程同时执行(通过CPU调度,在每个时间片中只有一个线程执行)
-
内存分配方面:系统在运行的时候会为每个进程分配不同的内存空间;而对线程而言,除了CPU外,系统不会为线程分配内存(线程所使用的资源来自其所属进程的资源),线程组之间只能共享资源
包含关系:
- 没有线程的进程可以看做是单线程的,如果一个进程内有多个线程,则执行过程不是一条线的,而是多条线(线程)共同完成的
- 线程是进程的一部分,所以线程也被称为轻量级进程
- 协程是线程的一部分,被称为微型线程
3、生成器
一切还是得从生成器说起,因为asyncio或者大多数协程库内部也是通过生成器实现的
生成器是一次生成一个值的特殊类型函数,可以将其视为可恢复函数,这里就不探究其内部实现原理了
(1)yield
简单例子如下
def gen_func():
yield 1
yield 2
yield 3
if __name__ == '__main__':
gen = gen_func()
for i in gen:
print(i)
output:
1
2
3
上面的例子没有什么稀奇的不是吗?yield像一个特殊的关键字,将函数变成了一个类似于迭代器的对象,可以使用for循环取值。
(2)send, next
协程自然不会这么简单,python协程的目标是星辰大海,从上面的例之所以get不到它的野心,是因为你没有试过send, next两个函数。
首先说next
def gen_func():
yield 1
yield 2
yield 3
if __name__ == '__main__':
gen = gen_func()
print(next(gen))
print(next(gen))
print(next(gen))
output:
1
2
3
next的操作有点像for循环,每调用一次next,就会从中取出一个yield出来的值,其实还是没啥特别的,感觉还没有for循环好用。
不过,不知道你有没有想过,如果你只需要一个值,你next一次就可以了,然后你可以去做其他事情,等到需要的时候才回来再次next取值。
就这一部分而言,你也许知道为啥说生成器是可以暂停的了,不过,这似乎也没什么用,那是因为你不知到时,生成器除了可以抛出值,还能将值传递进去。
接下来我们看send的例子。
def gen_func():
a = yield 1
print("a: ", a)
b = yield 2
print("b: ", b)
c = yield 3
print("c: ", c)
return "finish"
if __name__ == '__main__':
gen = gen_func()
for i in range(4):
if i == 0:
print(gen.send(None))
else:
# 因为gen生成器里面只有三个yield,那么只能循环三次。
# 第四次循环的时候,生成器会抛出StopIteration异常,并且return语句里面内容放在StopIteration异常里面
try:
print(gen.send(i))
except StopIteration as e:
print("e: ", e)
output:
1
a: 1
2
b: 2
3
c: 3
e: finish
send有着next差不多的功能,不过send在传递一个值给生成器的同时,还能获取到生成器yield抛出的值,在上面的代码中,send分别将None,1,2,3四个值传递给了生成器,之所以第一需要传递None给生成器,是因为规定,之所以规定,因为第一次传递过去的值没有特定的变量或者说对象能接收,所以规定只能传递None, 如果你传递一个非None的值进去,会抛出一下错误
TypeError: can't send non-None value to a just-started generator
从上面的例子我们也发现,生成器里面的变量a,b,c获得了send函数发送来的1, 2, 3.
如果你有事件循环或者说多路复用的经验,你也许能够隐隐察觉到微妙的感觉。
这个微妙的感觉是,是否可以将IO操作yield出来?由事件循环调度, 如果你能get到这个微妙的感觉,那么你已经知道协程高并发的秘密了.
(3)yield from
下面是yield from的例子
def gen_func():
a = yield 1
print("a: ", a)
b = yield 2
print("b: ", b)
c = yield 3
print("c: ", c)
return 4
def middle():
gen = gen_func()
ret = yield from gen
print("ret: ", ret)
return "middle Exception"
def main():
mid = middle()
for i in range(4):
if i == 0:
print(mid.send(None))
else:
try:
print(mid.send(i))
except StopIteration as e:
print("e: ", e)
if __name__ == '__main__':
main()
output:
1
a: 1
2
b: 2
3
c: 3
ret: 4
e: middle Exception
从上面的代码我们发现,main函数调用的middle函数的send,但是gen_func函数却能接收到main函数传递的值.有一种透传的感觉,这就是yield from的作用, 这很关键。
而yield from最终传递出来的值是StopIteration异常,异常里面的内容是最终接收生成器(本示例是gen_func)return出来的值,所以ret获得了gen_func函数return的4.但是ret将异常里面的值取出之后会继续将接收到的异常往上抛,所以main函数里面需要使用try语句捕获异常。而gen_func抛出的异常里面的值已经被middle函数接收,所以middle函数会将抛出的异常里面的值设为自身return的值
4、IO模型
linux有5种IO模型
(1)同步IO
同步模型自然是效率最低的模型了,每次只能处理完一个连接才能处理下一个,如果只有一个线程的话, 如果有一个连接一直占用,那么后来者只能傻傻的等了。所以不适合高并发,不过最简单,符合惯性思维
(2)非阻塞式IO
不会阻塞后面的代码,但是需要不停的显式询问内核数据是否准备好,一般通过while循环,而while循环会耗费大量的CPU。所以也不适合高并发。
(3)多路复用IO
当前最流行,使用最广泛的高并发方案。
而多路复用又有三种实现方式, 分别是select, poll, epoll:
-
select,poll由于设计的问题,当处理连接过多会造成性能线性下降,而epoll是在前人的经验上做过改进的解决方案。不会有此问题。
-
不过select, poll并不是一无是处,假设场景是连接数不多,并且每个连接非常活跃,select,poll是要性能高于epoll的。
可参考:select、poll、epoll之间的区别总结[整理]
(4)信号驱动式IO
没见过
(5)异步非阻塞IO
理论上比多路复用更快,因为少了一次调用,但是实际使用并没有比多路复用快非常多
5、事件循环
IO模型能够解决IO的效率问题,但是实际使用起来需要一个事件循环驱动协程去处理IO。
简单实现
下面引用官方的一个简单例子:
import selectors
import socket
# 创建一个selctor对象
# 在不同的平台会使用不同的IO模型,比如Linux使用epoll, windows使用select(不确定)
# 使用select调度IO
sel = selectors.DefaultSelector()
# 回调函数,用于接收新连接
def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
print('accepted', conn, 'from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read)
# 回调函数,用户读取client用户数据
def read(conn, mask):
data = conn.recv(1000) # Should be ready
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data) # Hope it won't block
else:
print('closing', conn)
sel.unregister(conn)
conn.close()
# 创建一个非堵塞的socket
sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
# 一个事件循环,用于IO调度
# 当IO可读或者可写的时候, 执行事件所对应的回调函数
def loop():
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
if __name__ == '__main__':
loop()
上面代码中loop函数对应事件循环,它要做的就是一遍一遍的等待IO,然后调用事件的回调函数.
二、asyncio库
1、asyncio库介绍
asyncio在python3.4被引入标准库,Python 3.5添加了async
和await
这两个关键字,分别用来替换asyncio.coroutine
和yield from
,协程成为新的语法,而不再是一种生成器类型了
需要明确一点,asyncio使用单线程、单个进程的方式切换;现存的一些库其实并不能原生的支持asyncio(因为会发生阻塞或者功能不可用),比如requests,如果要写爬虫,配合asyncio的应该用aiohttp,其他的如数据库驱动等各种Python对应的库也都得使用对应的aioXXX版本了
几个概念:
- event_loop 事件循环:相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件时,就会调用对应的处理方法。
- coroutine 协程:协程对象,只一个使用async关键字定义的函数,他的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环中,由事件循环调用。
- task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程的进一步封装,其中包含任务的各种状态。
- future:代表将来执行或没有执行的任务结果。它与task没有本质的区别。
-async/await 关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。
运行的序列图
import asyncio
async def compute(x, y):
print('Compute {} + {} ...'.format(x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print('{} + {} = {}'.format(x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
2、asyncio的使用
(1)定义一个协程
通过async定义一个协程,协程是一个对象,不能直接运行,需要把协程加入到事件循环(loop)中,由loop在适当的时候调用协程
asyncio.get_event_loop()
方法可以创建一个事件循环,然后由run_until_complete(协程对象)
将协程注册到事件循环中,并启动事件循环
run_until_complete
根据传递的参数的不同,返回的结果也有所不同:
run_until_complete()
传递的是一个协程对象或task对象,则返回他们finished的返回结果(前提是他们得有return的结果,否则返回None)run_until_complete(asyncio.wait(多个协程对象或任务))
,函数会返回一个元组包括(done, pending),通过访问done里的task对象,获取返回值run_until_complete(asyncio.gather(多个协程对象或任务))
,函数会返回一个列表,列表里面包括各个任务的返回结果,按顺序排列
python 3.7 以前的版本调用异步函数的步骤:
- 调用
asyncio.get_event_loop()
函数获取事件循环loop对象 - 通过不同的策略调用
loop.run_forever()
方法或者loop.run_until_complete()
方法执行异步函数
python3.7 以后的版本使用asyncio.run
即可。此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。
import asyncio
async def work(x): # 通过async关键字定义一个协程
for _ in range(3):
print('Work {} is running ..'.format(x))
coroutine_1 = work(1) # 协程是一个对象,不能直接运行
# 3.5<version<3.7:
loop = asyncio.get_event_loop() # 创建一个事件循环
result = loop.run_until_complete(coroutine_1) # 将协程对象加入到事件循环中,并执行
print(result) # 协程对象并没有返回结果,打印None
# version≥3.7:
# asyncio.run(coroutine_1) #创建一个新的事件循环,并以coroutine_1为程序的主入口,执行完毕后关闭事件循环
(2)创建一个task
协程对象不能直接运行,在注册到事件循环的时候,其实是run_until_complete
方法将协程包装成一个task对象,所谓的task对象就是Future类的子类,它保存了协程运行后的状态,用于未来获取协程的结果。
创建task后,task在加入事件循环之前是pending状态,因为下例中没有耗时操作,task很快会完成,后面打印finished状态。
import asyncio
async def work(x): # 通过async关键字定义一个协程
for _ in range(3):
print('Work {} is running ..'.format(x))
coroutine_1 = work(1) # 协程是一个对象,不能直接运行
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine_1)
# task = asyncio.ensure_future(coroutine_1) # 这样也能创建一个task
print(task)
loop.run_until_complete(task) # run_until_complete接受的参数是一个future对象,当传人一个协程时,其内部自动封装成task
print(task)
(3)绑定回调
在task执行完毕的时候可以获取执行的结果,回调的最后一个参数是future对象,通过这个对象可以获取协程的返回值,如果回调函数需要多个参数,可以通过偏函数导入。
从下例可以看出,coroutine执行结束时候会调用回调函数,并通过future获取协程返回(return)的结果。我们创建的task和回调里面的future对象,实际上是同一个对象。
import asyncio
async def work(x):
for _ in range(3):
print('Work {} is running ..'.format(x))
return "Work {} is finished".format(x)
def call_back(future):
print("Callback: {}".format(future.result()))
coroutine = work(1)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(call_back)
loop.run_until_complete(task) # 返回任务的结果
当回调函数需要传递多个参数时,可以使用functools里的partial方法(偏函数导入这些参数)
functools.partial(func, * args, * * keywords)
,函数装饰器,返回一个新的partial对象。调用partial对象和调用被修饰的函数func相同,只不过调用partial对象时传入的参数个数通常要少于调用func时传入的参数个数。当一个函数func可以接收很多参数,而某一次使用只需要更改其中的一部分参数,其他的参数都保持不变时,partial对象就可以将这些不变的对象冻结起来,这样调用partial对象时传入未冻结的参数,partial对象调用func时连同已经被冻结的参数一同传给func函数,从而可以简化调用过程
import asyncio
import functools
async def work(x):
for _ in range(3):
print('Work {} is running ..'.format(x))
return "Work {} is finished".format(x)
def call_back_2(num, future):
print("Callback_2: {}, the num is {}".format(future.result(), num))
coroutine = work(1)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(functools.partial(call_back_2, 100))
loop.run_until_complete(task)
(4)阻塞和await
使用async可以定义协程对象,使用await可以正对耗时操作进行挂起,就像生成器里的yield一样,函数让出控制权。
协程遇到await,事件循环就会挂起这个协程,执行别的协程,直到其他协程也挂起或执行完毕,在进行下一个协程的执行。
如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。
import asyncio
import time
async def work(delay, msg):
print(f"Task receives the message :'{msg}' ")
await asyncio.sleep(delay)
print(msg)
async def main():
print(f"Started at {time.strftime('%X')}")
await work(1, "hello") # 启动一个协程,但是这是同步执行的
await work(2, "world")
print(f"Finished at time {time.strftime('%X')}")
asyncio.run(main())
# 运行结果:
# 先打印print(f"Task receives the message :'{msg}' ")然后等待1秒后打印“hello”,
# 然后再次打印print(f"Task receives the message :'{msg}' ")等待2秒后打印“world”
简单的说,await就是挂起当前任务,去执行其他任务,此时是堵塞的,必须要等其他任务执行完毕才能返回到当前任务继续往下执行,这样的说的前提是,在一个时间循环中有多个task或future,当await右面等待的对象是协程对象时,就没有了并发的作用,就是堵塞等待这个协程对象完成
(5)协程嵌套
使用async可以定义协程,协程用于耗时的IO操作。我们也可以封装更多的IO操作过程,在一个协程中await另外一个协程,实现协程的嵌套。
import asyncio, time
async def work(x):
for _ in range(3):
print("Work {} is running ..".format(x))
await asyncio.sleep(1) # 当执行某个协程时,在任务阻塞的时候用await挂起
return "Work {} is finished!".format(x)
async def main_work():
coroutine_1 = work(1)
coroutine_2 = work(2)
coroutine_3 = work(3)
tasks = [
asyncio.ensure_future(coroutine_1),
asyncio.ensure_future(coroutine_2),
asyncio.ensure_future(coroutine_3),
]
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print("The task's result is : {}".format(task.result()))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main_work())
运行结果
Work 1 is running ..
Work 2 is running ..
Work 3 is running 一文搞明白Python并发编程和并行编程