python-asyncio

Posted 贝壳里的星海

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python-asyncio相关的知识,希望对你有一定的参考价值。

python -asyncio

协程是在用户态实现的上下文切换技术

相比线程切换,协程上下文切换代价更小。
协程是单线程的,不存在多线程互斥访问共享变量,不用锁、信号量等机制
协程非常灵活,当出现I/O阻塞时,就去切换任务,I/O完成再唤醒,这就是所谓的 异步I/O,

实现化协程对象的方法有:

  • yield关键字
  • yield from关键字
  • asyncawait 关键字

yield

def func1():
    yield 1
    yield from func2()
    yield 2


def func2():
    yield 3
    yield 4


f1 = func1()
for item in f1:
    print(item)

asyncio即Asynchronous I/O是python一个用来处理并发(concurrent)事件的包,是很多python异步架构的基础,多用于处理高并发网络请求方面的问题。
此处使用的是Python 3.5之后出现的async/await来实现协程

通过 async/await 语法来声明协程是编写 asyncio 应用的推荐方式。

asyncio中几个重要概念

1.事件循环

管理所有的事件,在整个程序运行过程中不断循环执行并追踪事件发生的顺序将它们放在队列中,空闲时调用相应的事件处理者来处理这些事件。

2.Future

Future对象表示尚未完成的计算,还未完成的结果

3.Task

是Future的子类,作用是在运行某个任务的同时可以并发的运行多个任务。

asyncio.Task用于实现协作式多任务的库,且Task对象不能用户手动实例化,通过下面2个函数创建:

asyncio 完整流程

1、定义/创建协程对象
2、将协程转为task任务
3、定义事件循环对象容器
4、将task任务扔进事件循环对象中触发
asyncio.async()
loop.create_task() 或 asyncio.ensure_future()

异步代码

import time
import asyncio

# 定义异步函数
async def hello():
    await asyncio.sleep(1)
    print(f\'Hello World: time.time()\')

def addtack(loop):
    for i in range(5):
        loop.create_task(hello())

if __name__ == \'__main__\':
    t1 = time.time()
    loop = asyncio.get_event_loop()      # 初始化时间
    addtack(loop)                        # 创建任务
    loop.run_until_complete(hello())     # 等待任务执行结束
    t2 = time.time()
    print(f"all time t2 - t1")

# -------------------------------------------------------
Hello World: 1684489947.7962332
Hello World: 1684489947.7962332
Hello World: 1684489947.7962332
Hello World: 1684489947.7962332
Hello World: 1684489947.7962332
Hello World: 1684489947.7962332
all time 1.0133047103881836

定义和运行

使用async def语句定义一个协程函数,但这个函数不可直接运行

# 普通函数
def function():
    return 1
   
# 异步函数
async def asynchronous():
    return 1
# 异步函数不同于普通函数不可能被直接调用
async def aaa():
    print(\'hello\')

print(aaa())

# 输出----------------------------------
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

如何运行一个协程呢,有三种方式:

asyncio.run()

1、使用函数,可直接运行

import asyncio

async def aaa():
    print(\'hello\')

asyncio.run(aaa())
# 输出-------------------------
hello

await()

2、使用await()进行异步等待

  • await语法等待另一个协程,这将挂起当前协程,直到另一个协程返回结果。

asyncio.sleep 也是一个协程,所以 await asyncio.sleep(x) 就是等待另一个协程

import asyncio
async def aaa():
    print(\'hello\')

async def main():
    await aaa()
    
asyncio.run(main())

asyncio.create_task()

使用asyncio.create_task() 函数来创建一个任务,放入事件循环中

import asyncio

async def aaa():
    print(\'hello\')

async def main():
    asyncio.create_task(aaa())
asyncio.run(main())
import asyncio

async def asfunc():
    await asyncio.sleep(1)
    print(\'hello\')

aaa=asfunc()
loop = asyncio.get_event_loop()
task = loop.create_task(aaa)
loop.run_until_complete(task)

创建Task

loop.create_task()

import asyncio

async def helperfunc():
    await asyncio.sleep(1)
    print(\'beike tian\')

coro = helperfunc()

loop = asyncio.get_event_loop()
print("loop:",loop)
task = loop.create_task(coro)
print(\'task:\', task)

loop.run_until_complete(task)
print(\'task:\', task)
loop.close()

# 输出 ----------------------------------------
loop: <ProactorEventLoop running=False closed=False debug=False>
task: <Task pending name=\'Task-1\' coro=<helperfunc() running at E:\\TzxNote\\Note\\lcodeNoteCards\\edge_det.py:3>>
beike tian
task: <Task finished name=\'Task-1\' coro=<helperfunc() done, defined at E:\\TzxNote\\Note\\lcodeNoteCards\\edge_det.py:3> result=None>

run_until_complete 是一个阻塞(blocking)调用,直到协程运行结束,它才返回

import asyncio

async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)

loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_work(3))

获取协程返回

有2种方案可以获取返回值。

方法一:通过task.result()

可通过调用 task.result() 方法来获取协程的返回值,但是只有运行完毕后才能获取,若没有运行完毕,result()方法不会阻塞去等待结果,而是抛出 asyncio.InvalidStateError 错误

import asyncio

async def helperfunc():
    await asyncio.sleep(1)
    print(\'beike tian\')
    return  "小贝壳"

coro = helperfunc()

loop = asyncio.get_event_loop()
task = loop.create_task(coro)
print(\'task:\', task)
try:
    print(\'task.result:\', task.result())
except asyncio.InvalidStateError:
    print(\'task状态未完成,捕获了 InvalidStateError 异常\')

loop.run_until_complete(task)
print(\'task:\', task)
print(\'task.result:\', task.result())
loop.close()

# ----------------------------
task: <Task pending name=\'Task-1\' coro=<helperfunc() running at E:\\TzxNote\\Note\\lcodeNoteCards\\edge_det.py:3>>
task状态未完成,捕获了 InvalidStateError 异常
beike tian
task: <Task finished name=\'Task-1\' coro=<helperfunc() done, defined at E:\\TzxNote\\Note\\lcodeNoteCards\\edge_det.py:3> result=\'小贝壳\'>
task.result: 小贝壳

方法二、asyncio.gather

import asyncio

async def func1(i):
    print(f"协程函数i马上开始执行。")
    await asyncio.sleep(2)
    return i

async def main():
    tasks = []
    for i in range(1, 5):
        tasks.append(func1(i))

    results = await asyncio.gather(*tasks)
    for result in results:
        print(f"执行结果: result")

if __name__ == \'__main__\':
    asyncio.run(main())

取消任务

import asyncio
import time

async def helperfunc(nums):
    await asyncio.sleep(1)
    print(f\'beike tian nums\')
    return  "小贝壳"


if __name__ == "__main__":
    task1 = helperfunc(2)
    task2 = helperfunc(3)
    task3 = helperfunc(3)

    tasks = [task1, task2, task3]
    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        all_tasks = asyncio.Task.all_tasks()
        for task in all_tasks:
            print("cancel task")
            print(task.cancel())
        loop.stop()
        # stop 调用之后,需要调用 run_forever,不然会报错
        loop.run_forever()
    finally:
        loop.close()

方法三:通过add_done_callback()回调

通过 Future 的 add_done_callback() 方法来添加回调函数,当任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数。

import asyncio

async def helperfunc():
    await asyncio.sleep(1)
    print(\'beike tian\')
    return  "小贝壳"

def my_callback(future):
    print(\'返回值:\', future.result())

coro = helperfunc()

loop = asyncio.get_event_loop()
task = loop.create_task(coro)
task.add_done_callback(my_callback)

loop.run_until_complete(task)
loop.close()

# -----------------------------------------
beike tian
返回值: 小贝壳

多任务控制

通过asyncio.wait()可以控制多任务

asyncio.wait()是一个协程,不会阻塞,立即返回,返回的是协程对象。传入的参数是future或协程构成的可迭代对象。最后将返回值传给run_until_complete()加入事件循环

asyncio.gatherasyncio.wait 功能相似。

方法1、asyncio.wait

import asyncio

async def coroutine_example(name):
    print(\'正在执行name:\', name)
    await asyncio.sleep(1)
    print(\'执行完毕name:\', name)

loop = asyncio.get_event_loop()

tasks = [coroutine_example(\'Zarten_\' + str(i)) for i in range(3)]
wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)
loop.close()

方法2、asyncio.gather

import asyncio
async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)
    
loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3)))

# 或者借助列表
coros = [do_some_work(1), do_some_work(3)]
loop.run_until_complete(asyncio.gather(*coros))

# -------------------------------------
Waiting 3
Waiting 1
<等待三秒钟>
Done

多任务返回

方法1、需要通过loop.create_task()创建task对象

import asyncio

async def coroutine_example(name):
    print(\'正在执行name:\', name)
    await asyncio.sleep(1)
    print(\'执行完毕name:\', name)
    return \'返回值:\' + name

loop = asyncio.get_event_loop()

tasks = [loop.create_task(coroutine_example(\'Zarten_\' + str(i))) for i in range(3)]
wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)

for task in tasks:
    print(task.result())

loop.close()

方法2、回调add_done_callback()

import asyncio

def my_callback(future):
    print(\'返回值:\', future.result())

async def coroutine_example(name):
    print(\'正在执行name:\', name)
    await asyncio.sleep(1)
    print(\'执行完毕name:\', name)
    return \'返回值:\' + name

loop = asyncio.get_event_loop()

tasks = []
for i in range(3):
    task = loop.create_task(coroutine_example(\'Zarten_\' + str(i)))
    task.add_done_callback(my_callback)
    tasks.append(task)

wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)

loop.close()

其他知识点

run_until_complete 实现的原理

run_forever 会一直运行,直到 stop 被调用

loop.run_forever() 会让线程一直运行。loop.run_until_complete() 借助了 run_forever 方法。
在 run_until_complete 的实现中,调用了 future.add_done_callback(_run_until_complete_cb)。
async def do_some_work(loop, x):
    print(\'Waiting \' + str(x))
    await asyncio.sleep(x)
    print(\'Done\')
    loop.stop()

wait 与 gather 中的区别

gather 比 wait 更加高层。gather 可以将任务分组,一般优先使用 gather。在某些定制化任务需求的时候,会使用 wait。

from functools import partial
import asyncio
import time


async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)

if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = [do_some_work(i) for i in range(10)]

    group1 = [do_some_work(i)  for i in range(2)]
    group2 = [do_some_work(i)  for i in range(2)]

    loop.run_until_complete(asyncio.gather(*group1, *group2))
    print(time.time() - start_time)

参考文献

https://docs.python.org/zh-cn/3/library/asyncio-task.html#running-an-asyncio-program

https://www.cnblogs.com/Red-Sun/p/16934843.html

https://zhuanlan.zhihu.com/p/137698989

https://www.cnblogs.com/MrReboot/p/16413332.html

https://zhuanlan.zhihu.com/p/59621713 动态添加协程

https://zhuanlan.zhihu.com/p/137698989 gather 和 wait 区别

Python-asyncio

1、asyncio 

  3.4版本加入标准库

  asyncio 底层基于selectors 实现,看似库,其实就是一个框架,包含异步IO,事件循环,协程,任务等待等内容。   

2、问题引出

def a():
    for x in range(3):
        print(x)

def b():
    for x in \'abc\':
        print(x)
a()
b()

# 运行结果
0
1
2
a
b
c

 

这是一个串行的程序。

def a():
    for x in range(3):
        print(x)
        yield

def b():
    for x in \'abc\':
        print(x)
        yield

x = a()
y = b()
for i in range(3):
    next(x)
    next(y)

 

3、事件循环:

  事件循环是asyncio 提供的核心运行机制

  

4、协程

  • 协程不是进程,也不是线程,它是用户空间调度的完成并发处理的方式
  • 进程,线程由操作系统完成调度,而协程是线程内完成调度。它不需要更多的线程,自然也没有多线程切换带来的开销
  • 协程是非抢占式调度,只有一个协程主动让出控制权,另一个协程才会被调度
  • 协程不需要使用锁机制,因为在同一个线程中执行。
  • 多CPU下,可以使用多进程和协程配合,既能进程并发,又能发挥协程在单线程中的 优势
  • Python中协程是基于生成器的。

5、协程的使用

  3.4引入asyncio ,使用装饰器

  asyncio.sleep(0.001):也是一个coroutine,是一个生成器函数,yield值
 1 import asyncio
 2 
 3 @asyncio.coroutine
 4 def sleep(x): #  协程函数
 5     for i in range(3):
 6         print(\'sleep {}\'.format(i))
 7         yield from asyncio.sleep(x)
 8 
 9 loop = asyncio.get_event_loop()
10 loop.run_until_complete(sleep(3)) # 将sleep(3) 封装成Task对象执行
11 loop.close()
12 print(\'===================\')

    结果:每一秒打印一个,最终打印 ========

1 sleep 0
2 sleep 1
3 sleep 2
4 ===================

 

  将生成器函数,转换为协程函数,就可以在时间循环中执行了。

  测试:

 1 import asyncio
 2 
 3 @asyncio.coroutine
 4 def sleep(x):
 5     for i in range(3):
 6         print(\'sleeP {}\'.format(i))
 7         yield from asyncio.sleep(x)
 8 
 9 loop = asyncio.get_event_loop()
10 
11 #自己封装 task 对象
12 task = loop.create_task(sleep(3))
13 print(1, task)
14 loop.run_until_complete(task)
15 print(2, task)
16 loop.close()
17 print(\'======== end =======\')

  结果:

1 1 <Task pending coro=<sleep() running at E:/code_pycharm/tt10.py:23>>
2 sleeP 0
3 sleeP 1
4 sleeP 2
5 2 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:23> result=None>
6 ======== end =======

 

  测试:添加回调函数,知道运行完,返回结果(异步非阻塞)

 1 import asyncio
 2 
 3 @asyncio.coroutine
 4 def sleep(x):
 5     for i in range(3):
 6         print(\'sleeP {}\'.format(i))
 7         yield from asyncio.sleep(0.001)
 8     # 给一个result
 9     return 2000
10 
11 def cb(future): # 回调函数
12     print(4, future,\'===\')
13     print(5, future.result())
14 
15 loop = asyncio.get_event_loop()
16 
17 #自己封装 task 对象
18 task = loop.create_task(sleep(3))
19 task.add_done_callback(cb)# 注册了一个回调函数
20 print(1, task)
21 loop.run_until_complete(task)
22 print(2, task)
23 print(3, task.result()) # 获取结果
24 loop.close()
25 print(\'======== end =======\')

 

  结果:打印2 之前,先执行了回调函数,且得到最终结果之前,一直在运行

1 1 <Task pending coro=<sleep() running at E:/code_pycharm/tt10.py:42> cb=[cb() at E:/code_pycharm/tt10.py:50]>
2 sleeP 0
3 sleeP 1
4 sleeP 2
5 4 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000> ===
6 5 2000
7 2 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000>
8 3 2000
9 ======== end =======

 

 

  测试:多任务:

 1 import asyncio
 2 
 3 @asyncio.coroutine
 4 def sleep(x):
 5     for i in range(3):
 6         print(\'sleeP {}\'.format(i))
 7         yield from asyncio.sleep(0.001)
 8     # 给一个result
 9     return 2000
10 
11 @asyncio.coroutine
12 def b():
13     for x in \'abc\':
14         print(x)
15         yield from asyncio.sleep(0.001)
16 
17 
18 def cb(future): # 回调函数
19     print(4, future,\'===\')
20     print(5, future.result())
21 
22 loop = asyncio.get_event_loop()
23 
24 #自己封装 task 对象
25 task = loop.create_task(sleep(3))
26 task.add_done_callback(cb)# 注册了一个回调函数
27 print(1, task)
28 # 固定套路,多任务
29 tasks = [task, b()]
30 ret = loop.run_until_complete(asyncio.wait(tasks))
31 
32 print(2, task)
33 print(3, task.result()) # 获取结果
34 print(6, ret)
35 loop.close()
36 print(\'======== end =======\')

  结果:

 1 1 <Task pending coro=<sleep() running at E:/code_pycharm/tt10.py:42> cb=[cb() at E:/code_pycharm/tt10.py:57]>
 2 sleeP 0
 3 a
 4 sleeP 1
 5 b
 6 sleeP 2
 7 c
 8 4 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000> ===
 9 5 2000
10 2 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000>
11 3 2000
12 6 ({<Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000>, <Task finished coro=<b() done, defined at E:/code_pycharm/tt10.py:50> result=None>}, set())
13 ======== end =======

 

  可以看出,返回一个元组,把之前的任务都会放在里边

    所以获取每个任务的result的方式:

      1、将任务封装为task,通过回调函数,或者,直接调用result()

      2、通过任务列表返回的结果,遍历获取        

print(6, ret[0])
for i in ret[0]:
    print(i.result())

 

  3.5版本之后,Python提供关键字async,await,在语言上原生支持协程

 1 import asyncio
 2 
 3 async def sleep(x):
 4     for i in range(3):
 5         print(\'sleeP {}\'.format(i))
 6         await asyncio.sleep(0.001)
 7     # 给一个result
 8     return 2000
 9 
10 async def b():
11     for x in \'abc\':
12         print(x)
13         await asyncio.sleep(0.001)
14 
15 
16 def cb(future): # 回调函数
17     print(4, future,\'===\')
18     print(5, future.result())
19 
20 loop = asyncio.get_event_loop()
21 
22 #自己封装 task 对象
23 task = loop.create_task(sleep(3))
24 task.add_done_callback(cb)# 注册了一个回调函数
25 print(1, task)
26 
27 tasks = [task, b()]
28 ret = loop.run_until_complete(asyncio.wait(tasks))
29 
30 print(2, task)
31 print(3, task.result()) # 获取结果
32 print(6, ret[0])
33 for i in ret[0]:
34     print(i.result())
35 loop.close()
36 print(\'======== end =======\')

 

  async def 用来定义协程函数,iscoroutinefunction() 返回True,协程函数中可以不包含await,async关键字,但不能使用yield 关键字

  如同生成器函数调用返生成器对象一样,协程函数调用 也会返回一个对象称为协程对象,iscoroutine()返回True。

  await语句之后是awaitable对象,可以是协程或者实现了__await__()方法的对象,await会暂停当前协程执行,使用loop调度其他协程。

 

  tcp  ECho server:

 1 import asyncio
 2 
 3 async def handle(reader:asyncio.StreamReader, writer:asyncio.StreamWriter):
 4     while True:
 5         data = await reader.read(1024)
 6         print(dir(reader))
 7         print(dir(writer))
 8         client = writer.get_extra_info(\'peername\')
 9         message = \'{} your msg {}\'.format(client, data.decode()).encode()
10         writer.write(message)
11         await writer.drain() # 注意不是flush 方法
12 loop = asyncio.get_event_loop()
13 ip = \'127.0.0.1\'
14 port = 9999
15 crt = asyncio.start_server(handle, ip, port, loop=loop)
16 
17 server = loop.run_until_complete(crt)
18 print(server)
19 try:
20     print(\'=========\')
21     loop.run_forever()
22 except KeyboardInterrupt:
23     pass
24 finally:
25     server.close()
26     loop.run_until_complete(server.wait_closed())
27     loop.close()
View Code

 

 

6、aiohttp库(异步的)

pip install aiohttp

文档:https://aiohttp.readthedocs.io/en/stable/

http server

http client

 

以上是关于python-asyncio的主要内容,如果未能解决你的问题,请参考以下文章

python-asyncio TypeError:对象字典不能用于“等待”表达式