asyncio 并发编程
Posted midworld
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了asyncio 并发编程相关的知识,希望对你有一定的参考价值。
Future 对象
future
表示还没有完成的工作结果。事件循环可以通过监视一个future
对象的状态来指示它已经完成。future
对象有几个状态:
Pending
:循环Running
:运行Done
:完成Cancelled
:取消
获取 Future 中的结果
创建future
的时候,task
为pending
,事件循环调用执行的时候是running
,调用完毕是done
,如果需要停止事件循环,就需要先把task
取消,状态为cancel
。
import asyncio
def callback(future, result):
print('future 的状态', future)
print('设置 future 的结果', result)
future.set_result(result) # 设置返回值
print('此时 future 的状态', future)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
all_done = asyncio.Future() # 创建一个 Future 对象
loop.call_soon(callback, all_done, 'Future is done!')
print('进入事件循环')
result = loop.run_until_complete(all_done)
print('返回值:', result)
finally:
print('关闭事件循环')
loop.close()
print('获取future 的返回值', all_done.result()) # 获取返回结果
运行结果:
进入事件循环
future 的状态 <Future pending cb=[_run_until_complete_cb() at C:Python35Libasyncioase_events.py:124]>
设置 future 的结果 Future is done!
此时 future 的状态 <Future finished result='Future is done!'>
返回值: Future is done!
关闭事件循环
获取future 的返回值 Future is done!
总结:
all_done = asyncio.Future() # 创建一个 Future 对象
all_done.result() # 获取返回结果
all_done.set_result(result) # 设置返回值
await 关键字获取 Future 中的结果
import asyncio
def callback(future, result):
print('设置 future 返回值', result)
future.set_result(result)
async def main(loop):
all_done = asyncio.Future()
loop.call_soon(callback, all_done, 'Future is done!')
# await 获取 future 结果
result = await all_done
print('future 中结果', result)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
print('进入事件循环')
loop.run_until_complete(main(loop))
finally:
loop.close()
运行结果:
进入事件循环
设置 future 返回值 Future is done!
future 中结果 Future is done!
Future 回调
Future
在完成的时候可以执行一些回调函数,回调函数按注册时的顺序进行调用:
def add_done_callback(self, fn):
"""Add a callback to be run when the future becomes done.
The callback is called with a single argument - the future object. If
the future is already done when this is called, the callback is
scheduled with call_soon.
"""
if self._state != _PENDING:
self._loop.call_soon(fn, self)
else:
self._callbacks.append(fn)
add_done_callback(self, fn)
,除了 self
外,只接收一个参数,且回调函数 不支持关键字参数,因此需要用到 functools.partial
包装:
import asyncio
import functools
def callback(future, n):
print('callback 执行, future is done', n, future.result())
async def register_callbacks(all_done):
print('注册 callback 对 future 对象中')
all_done.add_done_callback(functools.partial(callback, n=1))
all_done.add_done_callback(functools.partial(callback, n=2))
async def main(all_done):
await register_callbacks(all_done)
print('设置 future 的结果')
all_done.set_result('Future is done!')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
all_done = asyncio.Future()
loop.run_until_complete(main(all_done))
finally:
loop.close()
运行结果:
注册 callback 对 future 对象中
设置 future 的结果
callback 执行, future is done 1 Future is done!
callback 执行, future is done 2 Future is done!
Task 任务
并发执行任务
任务 Task 是与实践循环交互的主要途径之一,任务可以包装、跟踪协程。同时 Task 也是 Future 的子类,其使用方式与 Future 无异。
协程可以等待任务,每个任务都有一个结果,在它完成后可以获取这个结果。
协程是没有状态的,通过 create_task()
可以将协程包装成有状态的任务,也可以在任务运行中取消任务。
import asyncio
async def child():
print('进入子协程')
return '子协程返回值'
async def main(loop):
print('将子协程包装成 Task 任务')
task = loop.create_task(child()) # 将协程包装成有状态的任务
print('通过 cancel 取消 Task 任务')
task.cancel()
try:
result = await task # 使用 await 获取 task 的结果
print('task 结果', result)
except asyncio.CancelledError:
print('取消任务抛出异常')
else:
print('获取任务结果', task.result())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
运行结果:
将子协程包装成 Task 任务
通过 cancel 取消 Task 任务
取消任务抛出异常
总结:
task = loop.create_task(child()) # 将协程包装成有状态的任务
result = await task # 使用 await 获取 task 的结果
task.cancel() # 取消任务,会报 CancelledError
注释 task.cancel()
,即可正常执行任务:
将子协程包装成 Task 任务
进入子协程
task 结果 子协程返回值
获取任务结果 子协程返回值
可以使用 asyncio.ensure_future(coroutine)
建一个 task
。在 Python3.7 中可以使用 asyncio.create_task
创建任务。
组合协程 -- wait 等待多个协程
一般地通过 await
链式调用的方式可以管理一系列的协程,但是如果要在一个协程钟等待多个协程。如:在一个协程钟等待 1000 个异步网络请求,对于访问次序没有要求时。就可以使用另外的关键字 wait 或 gather
来解决,wait
可以暂停一个协程,直至后台操作完成。
import asyncio
async def num(n):
try:
await asyncio.sleep(n * 0.1)
return n
except asyncio.CancelledError:
print("数字 %s 被取消" % n)
raise
async def main():
tasks = [num(i) for i in range(10)]
complete, pending = await asyncio.wait(tasks, timeout=0.5)
print(complete, pending)
for i in complete:
print("当前数字", i.result())
if pending:
print("取消未完成的任务")
for p in pending:
p.cancel()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
运行结果:
{<Task finished coro=<num() done, defined at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:4> result=2>, <Task finished coro=<num() done, defined at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:4> result=0>, <Task finished coro=<num() done, defined at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:4> result=1>, <Task finished coro=<num() done, defined at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:4> result=3>, <Task finished coro=<num() done, defined at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:4> result=4>} {<Task pending coro=<num() running at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:6> wait_for=<Future pending cb=[Task._wakeup()]>>, <Task pending coro=<num() running at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:6> wait_for=<Future pending cb=[Task._wakeup()]>>, <Task pending coro=<num() running at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:6> wait_for=<Future pending cb=[Task._wakeup()]>>, <Task pending coro=<num() running at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:6> wait_for=<Future pending cb=[Task._wakeup()]>>, <Task pending coro=<num() running at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:6> wait_for=<Future finished result=None>>}
当前数字 2
当前数字 0
当前数字 1
当前数字 3
当前数字 4
取消未完成的任务
数字 5 被取消
数字 6 被取消
数字 9 被取消
数字 8 被取消
数字 7 被取消
总结:
wait
内部采用set
报错创建的Task
实例,因此是无序执行的wait
返回值为一个元组,包含两个集合,分别为已完成和未完成的任务wait
第二个参数为超时时间,超过这个时间,未完成的任务其状态将会变为pending
,可以看到 5 及以上的数字都被取消了,这是因为sleep
时间大于wait
超时时间
gather
gater 任务无法取消,返回值且只有一个,另外可以根据参数的传入顺序,顺序输出
import asyncio
from time import ctime
async def num(n):
try:
await asyncio.sleep(n * 0.1)
return n
except asyncio.CancelledError:
print("数字 %s 被取消" % n)
raise
async def main():
tasks = [num(i) for i in range(10)]
complete = await asyncio.gather(*tasks)
for i in complete:
print("当前数字", i, ctime())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
运行结果:
当前数字 0 Sat Oct 12 11:48:57 2019
当前数字 1 Sat Oct 12 11:48:57 2019
当前数字 2 Sat Oct 12 11:48:57 2019
当前数字 3 Sat Oct 12 11:48:57 2019
当前数字 4 Sat Oct 12 11:48:57 2019
当前数字 5 Sat Oct 12 11:48:57 2019
当前数字 6 Sat Oct 12 11:48:57 2019
当前数字 7 Sat Oct 12 11:48:57 2019
当前数字 8 Sat Oct 12 11:48:57 2019
当前数字 9 Sat Oct 12 11:48:57 2019
gather 阶段性操作
gather
通常被用来阶段性的一个操作,做完第一步才能做第二步:
import asyncio
from time import ctime, time
async def step1(n, start):
print("第一阶段开始", ctime())
await asyncio.sleep(n)
print("第一阶段完成", ctime())
print("此时用时", time() - start)
return n
async def step2(n, start):
print("第二阶段开始", ctime())
await asyncio.sleep(n)
print("第二阶段完成", ctime())
print("此时用时", time() - start)
return n
async def main():
now = time()
result = await asyncio.gather(step1(5, now), step2(2, now))
for i in result:
print(i)
print("总用时", time() - now)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
运行结果:
第二阶段开始 Sat Oct 12 11:54:25 2019
第一阶段开始 Sat Oct 12 11:54:25 2019
第二阶段完成 Sat Oct 12 11:54:27 2019
此时用时 2.000483512878418
第一阶段完成 Sat Oct 12 11:54:30 2019
此时用时 5.001391649246216
5
2
总用时 5.001391649246216
总结:
step1
和step2
是并行运行的。gather
会等待最耗时的那个完成之后才返回结果,耗时总时间取决于其中任务最长时间的那个。
任务完成时进行处理
as_complete
是一个生成器,它会管理指定的一个任务列表,并生成它们的结果。与 wait
类似,也是无序输出的,不过在执行其他动作之前没有必要等待所有后台操作完成。
import asyncio
from time import time, ctime
async def foo(n):
print('Waiting: ', n, ctime())
await asyncio.sleep(n)
return n
async def main():
coroutine1 = foo(1)
coroutine2 = foo(2)
coroutine3 = foo(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
for task in asyncio.as_completed(tasks):
result = await task
print('Task 执行结果: {}'.format(result), ctime())
now = lambda: time()
start = now()
loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print(now() - start)
运行结果:
Waiting: 1 Sat Oct 12 12:00:52 2019
Waiting: 2 Sat Oct 12 12:00:52 2019
Waiting: 4 Sat Oct 12 12:00:52 2019
Task 执行结果: 1 Sat Oct 12 12:00:53 2019
Task 执行结果: 2 Sat Oct 12 12:00:54 2019
Task 执行结果: 4 Sat Oct 12 12:00:56 2019
4.003674030303955
从上面运行结果可以看出来,只要任务完成了就立马返回结果,不等待其他任务。
多任务
import asyncio
import requests
async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:', tasks)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task Result:', task.result())
运行结果:
Tasks: [<Task pending coro=<request() running at D:/pycharm resource/Projects/TestDeploy/协程/异步 http.py:54>>, <Task pending coro=<request() running at D:/pycharm resource/Projects/TestDeploy/协程/异步 http.py:54>>, <Task pending coro=<request() running at D:/pycharm resource/Projects/TestDeploy/协程/异步 http.py:54>>, <Task pending coro=<request() running at D:/pycharm resource/Projects/TestDeploy/协程/异步 http.py:54>>, <Task pending coro=<request() running at D:/pycharm resource/Projects/TestDeploy/协程/异步 http.py:54>>]
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
动态创建协程
Python 协程只能运行在事件循环中,一旦事件循环运行,又会阻塞当前任务。如果想实现动态添加任务,则只能在另开启一个线程,这个线程的主要任务是用来运行事件循环。
import asyncio
import threading
from time import time, ctime
def thread_running_loop(lp):
print('loop running', ctime())
asyncio.set_event_loop(lp) # 在此线程中设置一个新的事件循环,默认情况事件循环是主协程的
lp.run_forever() # 一直运行
async def func(arg):
print('ready to work arg:', arg, ctime())
await asyncio.sleep(1)
print('done', arg, ctime())
if __name__ == '__main__':
# 创建一个新的事件循环给子线程
newlp = asyncio.new_event_loop()
t = threading.Thread(target=thread_running_loop, args=(newlp, ))
t.start()
# 添加 5 个协程,并制定事件循环,第二个参数
for i in range(5):
asyncio.run_coroutine_threadsafe(func(i), newlp)
t.join()
运行结果:
loop running Sat Oct 12 18:40:41 2019
ready to work arg: 0 Sat Oct 12 18:40:41 2019
ready to work arg: 1 Sat Oct 12 18:40:41 2019
ready to work arg: 2 Sat Oct 12 18:40:41 2019
ready to work arg: 3 Sat Oct 12 18:40:41 2019
ready to work arg: 4 Sat Oct 12 18:40:41 2019
done 0 Sat Oct 12 18:40:42 2019
done 2 Sat Oct 12 18:40:42 2019
done 4 Sat Oct 12 18:40:42 2019
done 1 Sat Oct 12 18:40:42 2019
done 3 Sat Oct 12 18:40:42 2019
另一种写法
import asyncio
from threading import Thread
async def production_task():
i = 0
while True:
# 将consumption这个协程每秒注册一个到运行在线程中的循环,thread_loop每秒会获得一个一直打印i的无限循环任务
asyncio.run_coroutine_threadsafe(consumption(i),
thread_loop) # 注意:run_coroutine_threadsafe 这个方法只能用在运行在线程中的循环事件使用
await asyncio.sleep(1) # 必须加await
i += 1
async def consumption(i):
while True:
print("我是第{}任务".format(i))
await asyncio.sleep(1)
def start_loop(loop):
# 运行事件循环, loop以参数的形式传递进来运行
asyncio.set_event_loop(loop)
loop.run_forever()
thread_loop = asyncio.new_event_loop() # 获取一个事件循环
run_loop_thread = Thread(target=start_loop, args=(thread_loop,)) # 将次事件循环运行在一个线程中,防止阻塞当前主线程
run_loop_thread.start() # 运行线程,同时协程事件循环也会运行
advocate_loop = asyncio.get_event_loop() # 将生产任务的协程注册到这个循环中
advocate_loop.run_until_complete(production_task()) # 运行次循环
控制并发量
asyncio
有多重同步机制,如: Semaphore、Lock(锁)、Condition(条件)、Event(事件)、Queue
,通过 Semaphore
可以来同步控制有效并发量,从而减轻服务器压力:
#coding:utf-8
import time,asyncio
a=time.time()
id=1
async def hello(id,semaphore):
async with semaphore:
await asyncio.sleep(1)
print('working id:'+str(id))
async def run():
semaphore = asyncio.Semaphore(5) # 限制并发量为5
to_get = [hello(id,semaphore) for id in range(20)] #总共20任务
await asyncio.wait(to_get)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
print(time.time()-a)
Tips:Python 提供的测试网站:http://httpbin.org/get?a={}
如何把同步的代码改成异步的
同步
import asyncio
def handle(name):
resp1 = func1(name)
resp2 = func2(name)
resp3 = func3(resp1, resp2)
func4(resp3)
func5(name)
def func1(name):
print('func1 正在运行,你好 {}'.format(name))
return 'func1'
def func2(name):
print('func2 正在运行,你好 {}'.format(name))
return 'fun2'
def func3(arg1, arg2):
print('func3 正在运行,参数:{}、{}'.format(arg1, arg2))
return 'func3'
def func4(arg):
print('func4 正在运行,参数 {}'.format(arg))
return 'func4'
def func5(name):
print('func5 正在运行,你好 {}'.format(name))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
coro = handle('rose')
loop.run_until_complete(coro)
以上代码中各函数从上而下以此执行,整个流程是同步的。
异步
将函数改成异步,几个步骤:
- 在函数前面加上
async
,将函数变为异步函数 - 使用
ensure_future
将调用函数变为future
await
关键字获取其结果
import asyncio
async def handle(name):
resp1 = asyncio.ensure_future(func1(name))
resp2 = asyncio.ensure_future(func2(name))
result1, result2 = await asyncio.gather(resp1, resp2)
print('--->', result1, result2)
resp3 = await func3(result1, result2)
await func4(resp3)
# 协程中调用普通函数
loop.call_soon(func5, name)
async def func1(name):
print('func1 正在运行,你好 {}'.format(name))
return 'func1'
async def func2(name):
print('func2 正在运行,你好 {}'.format(name))
return 'fun2'
async def func3(arg1, arg2):
print('func3 正在运行,参数:{}、{}'.format(arg1, arg2))
return 'func3'
async def func4(arg):
print('func4 正在运行,参数 {}'.format(arg))
return 'func4'
def func5(name):
print('func5 正在运行,你好 {}'.format(name))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
coro = handle('rose')
loop.run_until_complete(coro)
以上是关于asyncio 并发编程的主要内容,如果未能解决你的问题,请参考以下文章
asyncio--python3未来并发编程主流充满野心的模块