asyncio 并发编程

Posted midworld

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了asyncio 并发编程相关的知识,希望对你有一定的参考价值。

Future 对象

future 表示还没有完成的工作结果。事件循环可以通过监视一个future 对象的状态来指示它已经完成。future 对象有几个状态:

  • Pending:循环
  • Running:运行
  • Done:完成
  • Cancelled:取消

获取 Future 中的结果

创建future的时候,taskpending,事件循环调用执行的时候是running,调用完毕是done,如果需要停止事件循环,就需要先把task取消,状态为cancel

import asyncio


def callback(future, result):
    print('future 的状态', future)
    print('设置 future 的结果', result)
    future.set_result(result)           # 设置返回值
    print('此时 future 的状态', future)
    

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        all_done = asyncio.Future()     # 创建一个 Future 对象
        loop.call_soon(callback, all_done, 'Future is done!')
        print('进入事件循环')
        result = loop.run_until_complete(all_done)
        print('返回值:', result)
    finally:
        print('关闭事件循环')
        loop.close()
    print('获取future 的返回值', all_done.result())       # 获取返回结果

运行结果:

进入事件循环
future 的状态 <Future pending cb=[_run_until_complete_cb() at C:Python35Libasyncioase_events.py:124]>
设置 future 的结果 Future is done!
此时 future 的状态 <Future finished result='Future is done!'>
返回值: Future is done!
关闭事件循环
获取future 的返回值 Future is done!

总结:

all_done = asyncio.Future()     # 创建一个 Future 对象
all_done.result()               # 获取返回结果
all_done.set_result(result)         # 设置返回值

await 关键字获取 Future 中的结果

import asyncio


def callback(future, result):
    print('设置 future 返回值', result)
    future.set_result(result)


async def main(loop):
    all_done = asyncio.Future()
    loop.call_soon(callback, all_done, 'Future is done!')
    # await 获取 future 结果
    result = await all_done
    print('future 中结果', result)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        print('进入事件循环')
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

运行结果:

进入事件循环
设置 future 返回值 Future is done!
future 中结果 Future is done!

Future 回调

Future 在完成的时候可以执行一些回调函数,回调函数按注册时的顺序进行调用:

def add_done_callback(self, fn):
    """Add a callback to be run when the future becomes done.

    The callback is called with a single argument - the future object. If
    the future is already done when this is called, the callback is
    scheduled with call_soon.
    """
    if self._state != _PENDING:
        self._loop.call_soon(fn, self)
    else:
        self._callbacks.append(fn)

add_done_callback(self, fn),除了 self 外,只接收一个参数,且回调函数 不支持关键字参数,因此需要用到 functools.partial 包装:

import asyncio
import functools


def callback(future, n):
    print('callback 执行, future is done', n, future.result())


async def register_callbacks(all_done):
    print('注册 callback 对 future 对象中')
    all_done.add_done_callback(functools.partial(callback, n=1))
    all_done.add_done_callback(functools.partial(callback, n=2))


async def main(all_done):
    await register_callbacks(all_done)
    print('设置 future 的结果')
    all_done.set_result('Future is done!')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        all_done = asyncio.Future()
        loop.run_until_complete(main(all_done))
    finally:
        loop.close()

运行结果:

注册 callback 对 future 对象中
设置 future 的结果
callback 执行, future is done 1 Future is done!
callback 执行, future is done 2 Future is done!

Task 任务

并发执行任务

任务 Task 是与实践循环交互的主要途径之一,任务可以包装、跟踪协程。同时 Task 也是 Future 的子类,其使用方式与 Future 无异

协程可以等待任务,每个任务都有一个结果,在它完成后可以获取这个结果。

协程是没有状态的,通过 create_task() 可以将协程包装成有状态的任务,也可以在任务运行中取消任务。

import asyncio


async def child():
    print('进入子协程')
    return '子协程返回值'


async def main(loop):
    print('将子协程包装成 Task 任务')
    task = loop.create_task(child())        # 将协程包装成有状态的任务
    print('通过 cancel 取消 Task 任务')
    task.cancel()
    try:    
        result = await task         # 使用 await 获取 task 的结果
        print('task 结果', result)
    except asyncio.CancelledError:
        print('取消任务抛出异常')
    else:
        print('获取任务结果', task.result())


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

运行结果:

将子协程包装成 Task 任务
通过 cancel 取消 Task 任务
取消任务抛出异常

总结:

task = loop.create_task(child())        # 将协程包装成有状态的任务
result = await task         # 使用 await 获取 task 的结果
task.cancel()               # 取消任务,会报 CancelledError

注释 task.cancel(),即可正常执行任务:

将子协程包装成 Task 任务
进入子协程
task 结果 子协程返回值
获取任务结果 子协程返回值

可以使用 asyncio.ensure_future(coroutine) 建一个 task。在 Python3.7 中可以使用 asyncio.create_task创建任务。

组合协程 -- wait 等待多个协程

一般地通过 await 链式调用的方式可以管理一系列的协程,但是如果要在一个协程钟等待多个协程。如:在一个协程钟等待 1000 个异步网络请求,对于访问次序没有要求时。就可以使用另外的关键字 wait 或 gather 来解决,wait 可以暂停一个协程,直至后台操作完成。

import asyncio


async def num(n):
    try:
        await asyncio.sleep(n * 0.1)
        return n
    except asyncio.CancelledError:
        print("数字 %s 被取消" % n)
        raise


async def main():
    tasks = [num(i) for i in range(10)]
    complete, pending = await asyncio.wait(tasks, timeout=0.5)
    print(complete, pending)
    for i in complete:
        print("当前数字", i.result())
    if pending:
        print("取消未完成的任务")
        for p in pending:
            p.cancel()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

运行结果:

{<Task finished coro=<num() done, defined at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:4> result=2>, <Task finished coro=<num() done, defined at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:4> result=0>, <Task finished coro=<num() done, defined at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:4> result=1>, <Task finished coro=<num() done, defined at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:4> result=3>, <Task finished coro=<num() done, defined at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:4> result=4>} {<Task pending coro=<num() running at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:6> wait_for=<Future pending cb=[Task._wakeup()]>>, <Task pending coro=<num() running at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:6> wait_for=<Future pending cb=[Task._wakeup()]>>, <Task pending coro=<num() running at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:6> wait_for=<Future pending cb=[Task._wakeup()]>>, <Task pending coro=<num() running at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:6> wait_for=<Future pending cb=[Task._wakeup()]>>, <Task pending coro=<num() running at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:6> wait_for=<Future finished result=None>>}
当前数字 2
当前数字 0
当前数字 1
当前数字 3
当前数字 4
取消未完成的任务
数字 5 被取消
数字 6 被取消
数字 9 被取消
数字 8 被取消
数字 7 被取消

总结:

  • wait 内部采用 set 报错创建的 Task 实例,因此是无序执行的

  • wait 返回值为一个元组,包含两个集合,分别为已完成和未完成的任务

  • wait 第二个参数为超时时间,超过这个时间,未完成的任务其状态将会变为 pending,可以看到 5 及以上的数字都被取消了,这是因为 sleep 时间大于 wait 超时时间


gather

gater 任务无法取消,返回值且只有一个,另外可以根据参数的传入顺序,顺序输出

import asyncio
from time import ctime


async def num(n):
    try:
        await asyncio.sleep(n * 0.1)
        return n
    except asyncio.CancelledError:
        print("数字 %s 被取消" % n)
        raise


async def main():
    tasks = [num(i) for i in range(10)]
    complete = await asyncio.gather(*tasks)
    for i in complete:
        print("当前数字", i, ctime())


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

运行结果:

当前数字 0 Sat Oct 12 11:48:57 2019
当前数字 1 Sat Oct 12 11:48:57 2019
当前数字 2 Sat Oct 12 11:48:57 2019
当前数字 3 Sat Oct 12 11:48:57 2019
当前数字 4 Sat Oct 12 11:48:57 2019
当前数字 5 Sat Oct 12 11:48:57 2019
当前数字 6 Sat Oct 12 11:48:57 2019
当前数字 7 Sat Oct 12 11:48:57 2019
当前数字 8 Sat Oct 12 11:48:57 2019
当前数字 9 Sat Oct 12 11:48:57 2019

gather 阶段性操作

gather 通常被用来阶段性的一个操作,做完第一步才能做第二步:

import asyncio
from time import ctime, time


async def step1(n, start):
    print("第一阶段开始", ctime())
    await asyncio.sleep(n)
    print("第一阶段完成", ctime())
    print("此时用时", time() - start)
    return n


async def step2(n, start):
    print("第二阶段开始", ctime())
    await asyncio.sleep(n)
    print("第二阶段完成", ctime())
    print("此时用时", time() - start)
    return n


async def main():
    now = time()
    result = await asyncio.gather(step1(5, now), step2(2, now))
    for i in result:
        print(i)
    print("总用时", time() - now)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

运行结果:

第二阶段开始 Sat Oct 12 11:54:25 2019
第一阶段开始 Sat Oct 12 11:54:25 2019
第二阶段完成 Sat Oct 12 11:54:27 2019
此时用时 2.000483512878418
第一阶段完成 Sat Oct 12 11:54:30 2019
此时用时 5.001391649246216
5
2
总用时 5.001391649246216

总结:

  • step1step2是并行运行的。
  • gather 会等待最耗时的那个完成之后才返回结果,耗时总时间取决于其中任务最长时间的那个。

任务完成时进行处理

as_complete 是一个生成器,它会管理指定的一个任务列表,并生成它们的结果。与 wait 类似,也是无序输出的,不过在执行其他动作之前没有必要等待所有后台操作完成。

import asyncio
from time import time, ctime


async def foo(n):
    print('Waiting: ', n, ctime())
    await asyncio.sleep(n)
    return n


async def main():
    coroutine1 = foo(1)
    coroutine2 = foo(2)
    coroutine3 = foo(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print('Task 执行结果: {}'.format(result), ctime())


now = lambda: time()
start = now()

loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print(now() - start)

运行结果:

Waiting:  1 Sat Oct 12 12:00:52 2019
Waiting:  2 Sat Oct 12 12:00:52 2019
Waiting:  4 Sat Oct 12 12:00:52 2019
Task 执行结果: 1 Sat Oct 12 12:00:53 2019
Task 执行结果: 2 Sat Oct 12 12:00:54 2019
Task 执行结果: 4 Sat Oct 12 12:00:56 2019
4.003674030303955

从上面运行结果可以看出来,只要任务完成了就立马返回结果,不等待其他任务

多任务

import asyncio
import requests


async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:', tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))


for task in tasks:
    print('Task Result:', task.result())

运行结果:

Tasks: [<Task pending coro=<request() running at D:/pycharm resource/Projects/TestDeploy/协程/异步 http.py:54>>, <Task pending coro=<request() running at D:/pycharm resource/Projects/TestDeploy/协程/异步 http.py:54>>, <Task pending coro=<request() running at D:/pycharm resource/Projects/TestDeploy/协程/异步 http.py:54>>, <Task pending coro=<request() running at D:/pycharm resource/Projects/TestDeploy/协程/异步 http.py:54>>, <Task pending coro=<request() running at D:/pycharm resource/Projects/TestDeploy/协程/异步 http.py:54>>]
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>

动态创建协程

Python 协程只能运行在事件循环中,一旦事件循环运行,又会阻塞当前任务。如果想实现动态添加任务,则只能在另开启一个线程,这个线程的主要任务是用来运行事件循环。

import asyncio
import threading
from time import time, ctime


def thread_running_loop(lp):
    print('loop running', ctime())
    asyncio.set_event_loop(lp)      # 在此线程中设置一个新的事件循环,默认情况事件循环是主协程的
    lp.run_forever()                # 一直运行


async def func(arg):
    print('ready to work arg:', arg, ctime())
    await asyncio.sleep(1)
    print('done', arg, ctime())


if __name__ == '__main__':
    # 创建一个新的事件循环给子线程
    newlp = asyncio.new_event_loop()
    t = threading.Thread(target=thread_running_loop, args=(newlp, ))
    t.start()

    # 添加 5 个协程,并制定事件循环,第二个参数
    for i in range(5):
        asyncio.run_coroutine_threadsafe(func(i), newlp)

    t.join()

运行结果:

loop running Sat Oct 12 18:40:41 2019
ready to work arg: 0 Sat Oct 12 18:40:41 2019
ready to work arg: 1 Sat Oct 12 18:40:41 2019
ready to work arg: 2 Sat Oct 12 18:40:41 2019
ready to work arg: 3 Sat Oct 12 18:40:41 2019
ready to work arg: 4 Sat Oct 12 18:40:41 2019
done 0 Sat Oct 12 18:40:42 2019
done 2 Sat Oct 12 18:40:42 2019
done 4 Sat Oct 12 18:40:42 2019
done 1 Sat Oct 12 18:40:42 2019
done 3 Sat Oct 12 18:40:42 2019

另一种写法

import asyncio
from threading import Thread


async def production_task():
    i = 0
    while True:
        # 将consumption这个协程每秒注册一个到运行在线程中的循环,thread_loop每秒会获得一个一直打印i的无限循环任务
        asyncio.run_coroutine_threadsafe(consumption(i),
                                         thread_loop)  # 注意:run_coroutine_threadsafe 这个方法只能用在运行在线程中的循环事件使用
        await asyncio.sleep(1)  # 必须加await
        i += 1


async def consumption(i):
    while True:
        print("我是第{}任务".format(i))
        await asyncio.sleep(1)


def start_loop(loop):
    # 运行事件循环, loop以参数的形式传递进来运行
    asyncio.set_event_loop(loop)
    loop.run_forever()


thread_loop = asyncio.new_event_loop()  # 获取一个事件循环
run_loop_thread = Thread(target=start_loop, args=(thread_loop,))  # 将次事件循环运行在一个线程中,防止阻塞当前主线程
run_loop_thread.start()  # 运行线程,同时协程事件循环也会运行

advocate_loop = asyncio.get_event_loop()  # 将生产任务的协程注册到这个循环中
advocate_loop.run_until_complete(production_task())  # 运行次循环

控制并发量

asyncio 有多重同步机制,如: Semaphore、Lock(锁)、Condition(条件)、Event(事件)、Queue ,通过 Semaphore 可以来同步控制有效并发量,从而减轻服务器压力:

#coding:utf-8
import time,asyncio
 
a=time.time()
 
id=1
async def hello(id,semaphore):
    async with semaphore:
        await asyncio.sleep(1)
        print('working id:'+str(id))
 
async def run():
    semaphore = asyncio.Semaphore(5) # 限制并发量为5
    to_get = [hello(id,semaphore) for id in range(20)] #总共20任务
    await asyncio.wait(to_get)
 
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    loop.close()
    print(time.time()-a)

Tips:Python 提供的测试网站:http://httpbin.org/get?a={}

如何把同步的代码改成异步的

同步

import asyncio


def handle(name):
    resp1 = func1(name)
    resp2 = func2(name)
    resp3 = func3(resp1, resp2)
    
    func4(resp3)
    func5(name)


def func1(name):
    print('func1 正在运行,你好 {}'.format(name))

    return 'func1'


def func2(name):
    print('func2 正在运行,你好 {}'.format(name))

    return 'fun2'


def func3(arg1, arg2):
    print('func3 正在运行,参数:{}、{}'.format(arg1, arg2))

    return 'func3'


def func4(arg):
    print('func4 正在运行,参数 {}'.format(arg))

    return 'func4'


def func5(name):
    print('func5 正在运行,你好 {}'.format(name))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    coro = handle('rose')

    loop.run_until_complete(coro)

以上代码中各函数从上而下以此执行,整个流程是同步的。

异步

将函数改成异步,几个步骤:

  • 在函数前面加上 async,将函数变为异步函数
  • 使用 ensure_future 将调用函数变为 future
  • await 关键字获取其结果
import asyncio


async def handle(name):
    resp1 = asyncio.ensure_future(func1(name))
    resp2 = asyncio.ensure_future(func2(name))

    result1, result2 = await asyncio.gather(resp1, resp2)
    print('--->', result1, result2)
    resp3 = await func3(result1, result2)
    await func4(resp3)

    # 协程中调用普通函数
    loop.call_soon(func5, name)

async def func1(name):
    print('func1 正在运行,你好 {}'.format(name))

    return 'func1'


async def func2(name):
    print('func2 正在运行,你好 {}'.format(name))

    return 'fun2'


async def func3(arg1, arg2):
    print('func3 正在运行,参数:{}、{}'.format(arg1, arg2))

    return 'func3'


async def func4(arg):
    print('func4 正在运行,参数 {}'.format(arg))

    return 'func4'


def func5(name):
    print('func5 正在运行,你好 {}'.format(name))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    coro = handle('rose')

    loop.run_until_complete(coro)

以上是关于asyncio 并发编程的主要内容,如果未能解决你的问题,请参考以下文章

python异步编程之asyncio(百万并发)

asyncio:Python异步编程模块

asyncio--python3未来并发编程主流充满野心的模块

python异步编程之asyncio(百万并发)

Python并发编程之初识异步IO框架:asyncio 上篇

asyncio 并发编程