一文搞明白Python协程编程:asyncio库

Posted 思源湖的鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一文搞明白Python协程编程:asyncio库相关的知识,希望对你有一定的参考价值。

前言

本文试图搞明白Python协程编程
通常我们认为线程是轻量级的进程,因此我们也把协程理解为轻量级的线程即微线程

这是几个姊妹篇:

一、基础知识

1、并行和并发

在学习的时候,发现并行和并发在好些地方搞混了,这是两个概念,得先明确下

(1)定义

Erlang 之父 Joe Armstrong 画了一张很可爱的图来解释这两个概念:

  • 并发是两个队列交替使用一台咖啡机
  • 并行是两个队列同时使用两台咖啡机


两个词很好的说明了并发和并行的区别:

  • Parallel Computing:并行计算
  • Concurrent programming:并发编程

(2)联系

那么并发并行和多进程多线程的关系呢?

  • 多核cpu,多个进程可以并行在多个cpu中计算,当然也会存在进程切换;单核cpu,多个进程在这个单核cpu中是并发运行,根据时间片读取上下文+执行程序+保存上下文。同一个进程同一时间段只能在一个cpu中运行,如果进程数小于cpu数,那么未使用的cpu将会空闲
  • 多核cpu,进程中的多线程并行执行;单核cpu,多线程在单核cpu中并发执行,根据时间片切换线程。同一个线程同一时间段只能在一个cpu内核中运行,如果线程数小于cpu内核数,那么将有多余的内核空闲

场景:

  • 多核CPU——计算密集型任务:尽量使用并行计算,可以提高任务执行效率。计算密集型任务会持续地将CPU占满,此时有越多CPU来分担任务,计算速度就会越快,这是并行的用武之地
  • 单核CPU——计算密集型任务:此时的任务已经把CPU资源100%消耗了,就没必要使用并行计算,毕竟硬件障碍摆在那里
  • 单核CPU——I/O密集型任务:I/O密集型任务在任务执行时需要经常调用磁盘、屏幕、键盘等外设,由于调用外设时CPU会空闲,所以CPU的利用率并不高,此时使用多线程程序,只是便于人机交互。计算效率提升不大。
  • 多核CPU——I/O密集型任务:同单核CPU——I/O密集型任务

总结下:

  • 并行从代码层次上强依赖于多进程/多线程代码,从硬件角度上则依赖于多核CPU
  • 并发是一种现象:同时运行多个程序或多个任务需要被处理的现象,这些任务可能是并行执行的,也可能是串行执行的,和CPU核心数无关,是操作系统进程调度和CPU上下文切换达到的结果

2、进程、线程和协程

(1)定义

1、进程

  • 进程是程序的一次执行过程,是一个动态概念,是程序在执行过程中分配和管理资源的基本单位
  • 在面向线程设计的系统(如当代多数操作系统、Linux 2.6及更新的版本)中,进程本身不是基本运行单位,而是线程的容器
  • 进程拥有自己独立的内存空间,所属线程可以访问进程的空间
  • 程序本身只是指令、数据及其组织形式的描述,进程才是程序的真正运行实例

2、线程

  • 线程是CPU调度和分派的基本单位,它可与同属一个进程的其他的线程共享进程所拥有的全部资源
  • 当前的操作系统是面向线程的,即以线程为基本运行单位,并按线程分配CPU

3、协程

  • 又称微线程,纤程,英文名Coroutine。协程的作用是在执行函数A时可以随时中断去执行函数B,然后中断函数B继续执行函数A(可以自由切换)。但这一过程并不是函数调用,是线程里的并发

  • 拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方(非CPU),在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,CPU感觉不到协程的存在,协程是用户自己控制的

  • 优点:
    无需线程上下文切换的开销
    无需数据操作锁定及同步的开销
    方便切换控制流,简化编程模型
    高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理

  • 缺点:
    无法利用多核资源:协程的本质是个单线程,它不能同时将单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上,协程如果要使用多核CPU的话,那么就需要先启多个进程,在每个进程下启一个线程,然后在线程下在启协程。
    日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用

(2)联系

线程是进程的一部分,一个线程只能属于一个进程,而一个进程可以有多个线程,且至少有一个线程。而协程则包含在线程中

可以看个图

区别:理解它们的差别,从资源使用的角度出发。(所谓的资源就是计算机里的中央处理器,内存,文件,网络等等)

  • 根本区别:进程是操作系统资源分配的基本单位,而线程是任务调度和执行的基本单位

  • 在开销方面:每个进程都有独立的代码和数据空间(程序上下文),程序之间的切换会有较大的开销;线程共享代码和数据空间,每个线程都有自己独立的运行栈和程序计数器(PC),线程之间切换的开销小

  • 所处环境:在操作系统中能同时运行多个进程(程序);而在同一个进程(程序)中有多个线程同时执行(通过CPU调度,在每个时间片中只有一个线程执行)

  • 内存分配方面:系统在运行的时候会为每个进程分配不同的内存空间;而对线程而言,除了CPU外,系统不会为线程分配内存(线程所使用的资源来自其所属进程的资源),线程组之间只能共享资源

包含关系:

  • 没有线程的进程可以看做是单线程的,如果一个进程内有多个线程,则执行过程不是一条线的,而是多条线(线程)共同完成的
  • 线程是进程的一部分,所以线程也被称为轻量级进程
  • 协程是线程的一部分,被称为微型线程

3、生成器

一切还是得从生成器说起,因为asyncio或者大多数协程库内部也是通过生成器实现的
生成器是一次生成一个值的特殊类型函数,可以将其视为可恢复函数,这里就不探究其内部实现原理了

(1)yield

简单例子如下

def gen_func():
    yield 1
    yield 2
    yield 3
 
if __name__ == '__main__':
    gen = gen_func()
    for i in gen:
        print(i)
 
output:
1
2
3

上面的例子没有什么稀奇的不是吗?yield像一个特殊的关键字,将函数变成了一个类似于迭代器的对象,可以使用for循环取值。

(2)send, next

协程自然不会这么简单,python协程的目标是星辰大海,从上面的例之所以get不到它的野心,是因为你没有试过send, next两个函数。

首先说next

def gen_func():
    yield 1
    yield 2
    yield 3
 
if __name__ == '__main__':
    gen = gen_func()
    print(next(gen))
    print(next(gen))
    print(next(gen))
 
output:
1
2
3

next的操作有点像for循环,每调用一次next,就会从中取出一个yield出来的值,其实还是没啥特别的,感觉还没有for循环好用。

不过,不知道你有没有想过,如果你只需要一个值,你next一次就可以了,然后你可以去做其他事情,等到需要的时候才回来再次next取值。

就这一部分而言,你也许知道为啥说生成器是可以暂停的了,不过,这似乎也没什么用,那是因为你不知到时,生成器除了可以抛出值,还能将值传递进去。

接下来我们看send的例子。

def gen_func():
    a = yield 1
    print("a: ", a)
    b = yield 2
    print("b: ", b)
    c = yield 3
    print("c: ", c)
    return "finish"
 
if __name__ == '__main__':
    gen = gen_func()
    for i in range(4):
        if i == 0:
            print(gen.send(None))
        else:
            # 因为gen生成器里面只有三个yield,那么只能循环三次。
            # 第四次循环的时候,生成器会抛出StopIteration异常,并且return语句里面内容放在StopIteration异常里面
            try:
                print(gen.send(i))
            except StopIteration as e:
                print("e: ", e)
 
output:
1
a:  1
2
b:  2
3
c:  3
e:  finish

send有着next差不多的功能,不过send在传递一个值给生成器的同时,还能获取到生成器yield抛出的值,在上面的代码中,send分别将None,1,2,3四个值传递给了生成器,之所以第一需要传递None给生成器,是因为规定,之所以规定,因为第一次传递过去的值没有特定的变量或者说对象能接收,所以规定只能传递None, 如果你传递一个非None的值进去,会抛出一下错误

TypeError: can't send non-None value to a just-started generator

从上面的例子我们也发现,生成器里面的变量a,b,c获得了send函数发送来的1, 2, 3.

如果你有事件循环或者说多路复用的经验,你也许能够隐隐察觉到微妙的感觉。

这个微妙的感觉是,是否可以将IO操作yield出来?由事件循环调度, 如果你能get到这个微妙的感觉,那么你已经知道协程高并发的秘密了.

(3)yield from

下面是yield from的例子

def gen_func():
    a = yield 1
    print("a: ", a)
    b = yield 2
    print("b: ", b)
    c = yield 3
    print("c: ", c)
    return 4
 
def middle():
    gen = gen_func()
    ret = yield from gen
    print("ret: ", ret)
    return "middle Exception"
 
def main():
    mid = middle()
    for i in range(4):
        if i == 0:
            print(mid.send(None))
        else:
            try:
                print(mid.send(i))
            except StopIteration as e:
                print("e: ", e)
 
if __name__ == '__main__':
    main()
 
output: 
1
a:  1
2
b:  2
3
c:  3
ret:  4
e:  middle Exception

从上面的代码我们发现,main函数调用的middle函数的send,但是gen_func函数却能接收到main函数传递的值.有一种透传的感觉,这就是yield from的作用, 这很关键。

而yield from最终传递出来的值是StopIteration异常,异常里面的内容是最终接收生成器(本示例是gen_func)return出来的值,所以ret获得了gen_func函数return的4.但是ret将异常里面的值取出之后会继续将接收到的异常往上抛,所以main函数里面需要使用try语句捕获异常。而gen_func抛出的异常里面的值已经被middle函数接收,所以middle函数会将抛出的异常里面的值设为自身return的值

4、IO模型

linux有5种IO模型

(1)同步IO


同步模型自然是效率最低的模型了,每次只能处理完一个连接才能处理下一个,如果只有一个线程的话, 如果有一个连接一直占用,那么后来者只能傻傻的等了。所以不适合高并发,不过最简单,符合惯性思维

(2)非阻塞式IO


不会阻塞后面的代码,但是需要不停的显式询问内核数据是否准备好,一般通过while循环,而while循环会耗费大量的CPU。所以也不适合高并发。

(3)多路复用IO


当前最流行,使用最广泛的高并发方案。

而多路复用又有三种实现方式, 分别是select, poll, epoll:

  • select,poll由于设计的问题,当处理连接过多会造成性能线性下降,而epoll是在前人的经验上做过改进的解决方案。不会有此问题。

  • 不过select, poll并不是一无是处,假设场景是连接数不多,并且每个连接非常活跃,select,poll是要性能高于epoll的。

可参考:select、poll、epoll之间的区别总结[整理]

(4)信号驱动式IO


没见过

(5)异步非阻塞IO


理论上比多路复用更快,因为少了一次调用,但是实际使用并没有比多路复用快非常多

5、事件循环

IO模型能够解决IO的效率问题,但是实际使用起来需要一个事件循环驱动协程去处理IO。

简单实现
下面引用官方的一个简单例子:

import selectors
import socket
 
# 创建一个selctor对象
# 在不同的平台会使用不同的IO模型,比如Linux使用epoll, windows使用select(不确定)
# 使用select调度IO
sel = selectors.DefaultSelector()
 
# 回调函数,用于接收新连接
def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)
 
# 回调函数,用户读取client用户数据
def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()
 
# 创建一个非堵塞的socket
sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
 
# 一个事件循环,用于IO调度
# 当IO可读或者可写的时候, 执行事件所对应的回调函数
def loop():
    while True:
        events = sel.select()
        for key, mask in events:
            callback = key.data
            callback(key.fileobj, mask)
 
if __name__ == '__main__':
    loop()

上面代码中loop函数对应事件循环,它要做的就是一遍一遍的等待IO,然后调用事件的回调函数.

二、asyncio库

1、asyncio库介绍

asyncio在python3.4被引入标准库,Python 3.5添加了asyncawait这两个关键字,分别用来替换asyncio.coroutineyield from,协程成为新的语法,而不再是一种生成器类型了

需要明确一点,asyncio使用单线程、单个进程的方式切换;现存的一些库其实并不能原生的支持asyncio(因为会发生阻塞或者功能不可用),比如requests,如果要写爬虫,配合asyncio的应该用aiohttp,其他的如数据库驱动等各种Python对应的库也都得使用对应的aioXXX版本了

几个概念:

  • event_loop 事件循环:相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件时,就会调用对应的处理方法。
  • coroutine 协程:协程对象,只一个使用async关键字定义的函数,他的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环中,由事件循环调用。
  • task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程的进一步封装,其中包含任务的各种状态。
  • future:代表将来执行或没有执行的任务结果。它与task没有本质的区别。
    -async/await 关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

运行的序列图

import asyncio

async def compute(x, y):
    print('Compute {} + {} ...'.format(x, y))
    await asyncio.sleep(1.0)
    return x + y


async def print_sum(x, y):
    result = await compute(x, y)
    print('{} + {} = {}'.format(x, y, result))


loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

2、asyncio的使用

(1)定义一个协程

通过async定义一个协程,协程是一个对象,不能直接运行,需要把协程加入到事件循环(loop)中,由loop在适当的时候调用协程

asyncio.get_event_loop()方法可以创建一个事件循环,然后由run_until_complete(协程对象)将协程注册到事件循环中,并启动事件循环

run_until_complete根据传递的参数的不同,返回的结果也有所不同:

  • run_until_complete()传递的是一个协程对象或task对象,则返回他们finished的返回结果(前提是他们得有return的结果,否则返回None)
  • run_until_complete(asyncio.wait(多个协程对象或任务)),函数会返回一个元组包括(done, pending),通过访问done里的task对象,获取返回值
  • run_until_complete(asyncio.gather(多个协程对象或任务)),函数会返回一个列表,列表里面包括各个任务的返回结果,按顺序排列

python 3.7 以前的版本调用异步函数的步骤:

  • 调用asyncio.get_event_loop()函数获取事件循环loop对象
  • 通过不同的策略调用loop.run_forever()方法或者loop.run_until_complete()方法执行异步函数

python3.7 以后的版本使用asyncio.run即可。此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。

import asyncio
 
async def work(x):  # 通过async关键字定义一个协程
    for _ in range(3):
        print('Work {} is running ..'.format(x))

coroutine_1 = work(1)  # 协程是一个对象,不能直接运行

# 3.5<version<3.7:
loop = asyncio.get_event_loop()  # 创建一个事件循环
result = loop.run_until_complete(coroutine_1)  # 将协程对象加入到事件循环中,并执行
print(result)  # 协程对象并没有返回结果,打印None
# version≥3.7:
# asyncio.run(coroutine_1)  #创建一个新的事件循环,并以coroutine_1为程序的主入口,执行完毕后关闭事件循环

(2)创建一个task

协程对象不能直接运行,在注册到事件循环的时候,其实是run_until_complete方法将协程包装成一个task对象,所谓的task对象就是Future类的子类,它保存了协程运行后的状态,用于未来获取协程的结果。

创建task后,task在加入事件循环之前是pending状态,因为下例中没有耗时操作,task很快会完成,后面打印finished状态。

import asyncio

async def work(x):  # 通过async关键字定义一个协程
    for _ in range(3):
        print('Work {} is running ..'.format(x))

coroutine_1 = work(1)  # 协程是一个对象,不能直接运行
 
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine_1)
# task = asyncio.ensure_future(coroutine_1)  # 这样也能创建一个task
print(task)
loop.run_until_complete(task)  # run_until_complete接受的参数是一个future对象,当传人一个协程时,其内部自动封装成task
print(task)

(3)绑定回调

在task执行完毕的时候可以获取执行的结果,回调的最后一个参数是future对象,通过这个对象可以获取协程的返回值,如果回调函数需要多个参数,可以通过偏函数导入。

从下例可以看出,coroutine执行结束时候会调用回调函数,并通过future获取协程返回(return)的结果。我们创建的task和回调里面的future对象,实际上是同一个对象。

import asyncio

async def work(x):
    for _ in range(3):
        print('Work {} is running ..'.format(x))
    return "Work {} is finished".format(x)

def call_back(future):
    print("Callback: {}".format(future.result()))
    
coroutine = work(1)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(call_back)
loop.run_until_complete(task)  # 返回任务的结果

当回调函数需要传递多个参数时,可以使用functools里的partial方法(偏函数导入这些参数)

functools.partial(func, * args, * * keywords),函数装饰器,返回一个新的partial对象。调用partial对象和调用被修饰的函数func相同,只不过调用partial对象时传入的参数个数通常要少于调用func时传入的参数个数。当一个函数func可以接收很多参数,而某一次使用只需要更改其中的一部分参数,其他的参数都保持不变时,partial对象就可以将这些不变的对象冻结起来,这样调用partial对象时传入未冻结的参数,partial对象调用func时连同已经被冻结的参数一同传给func函数,从而可以简化调用过程

import asyncio
import functools

async def work(x):
    for _ in range(3):
        print('Work {} is running ..'.format(x))
    return "Work {} is finished".format(x)

    
def call_back_2(num, future):
    print("Callback_2: {}, the num is {}".format(future.result(), num))
    
coroutine = work(1)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(functools.partial(call_back_2, 100))
loop.run_until_complete(task)

(4)阻塞和await

使用async可以定义协程对象,使用await可以正对耗时操作进行挂起,就像生成器里的yield一样,函数让出控制权。

协程遇到await,事件循环就会挂起这个协程,执行别的协程,直到其他协程也挂起或执行完毕,在进行下一个协程的执行。

如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。

import asyncio
import time

async def work(delay, msg):
    print(f"Task receives the message :'{msg}' ")
    await asyncio.sleep(delay)
    print(msg)

async def main():
    print(f"Started at {time.strftime('%X')}")
    await work(1, "hello")  # 启动一个协程,但是这是同步执行的
    await work(2, "world")
    print(f"Finished at time {time.strftime('%X')}")

asyncio.run(main())
# 运行结果:
# 先打印print(f"Task receives the message :'{msg}' ")然后等待1秒后打印“hello”,
# 然后再次打印print(f"Task receives the message :'{msg}' ")等待2秒后打印“world”

简单的说,await就是挂起当前任务,去执行其他任务,此时是堵塞的,必须要等其他任务执行完毕才能返回到当前任务继续往下执行,这样的说的前提是,在一个时间循环中有多个task或future,当await右面等待的对象是协程对象时,就没有了并发的作用,就是堵塞等待这个协程对象完成

(5)协程嵌套

使用async可以定义协程,协程用于耗时的IO操作。我们也可以封装更多的IO操作过程,在一个协程中await另外一个协程,实现协程的嵌套。

import asyncio, time

async def work(x):
    for _ in range(3):
        print("Work {} is running ..".format(x))
        await asyncio.sleep(1)  # 当执行某个协程时,在任务阻塞的时候用await挂起
    return "Work {} is finished!".format(x)

async def main_work():
    coroutine_1 = work(1)
    coroutine_2 = work(2)
    coroutine_3 = work(3)
    
    tasks = [
        asyncio.ensure_future(coroutine_1),
        asyncio.ensure_future(coroutine_2),
        asyncio.ensure_future(coroutine_3),
    ]
    
    dones, pendings = await asyncio.wait(tasks)
    
    for task in dones:
        print("The task's result is : {}".format(task.result()))
        
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main_work())

运行结果

Work 1 is running ..
Work 2 is running ..
Work 3 is running 一文搞明白Python并发编程和并行编程

一文搞明白Python并发编程和并行编程

一文搞明白协程的挂起和恢复

一文搞明白协程的挂起和恢复

一文搞明白协程的挂起和恢复

一文搞明白Python多进程编程:multiprocessing库