从 concurrent.futures 到 asyncio
Posted
技术标签:
【中文标题】从 concurrent.futures 到 asyncio【英文标题】:From concurrent.futures to asyncio 【发布时间】:2016-12-03 19:36:25 【问题描述】:concurrent.futures 有两个问题:
如何在 python concurrent.futures 中打破 time.sleep()?
结论:time.sleep() 不能被中断。一种解决方案是:您可以围绕它编写一个循环并进行短暂的睡眠。
见How to break time.sleep() in a python concurrent.futures
concurrent.futures 的个别超时?
结论:个别超时需要用户自己实现。例如:对于每次超时,您可以致电wait()。
见Individual timeouts for concurrent.futures
问题
asyncio 能解决这些问题吗?
【问题讨论】:
为了自我封闭,你能在这里总结一下其他两个问题吗? 【参考方案1】:在异步模型中,执行由事件循环调度和协调。要取消当前暂停任务的执行,您基本上只需要不恢复它。虽然这在实践中有点不同,但很明显这使得取消暂停的任务在理论上很简单。
单独的超时当然是可能的:每当您暂停协程以等待结果时,您都想提供一个超时值。事件循环将确保在达到超时且任务尚未完成时取消等待任务。
一些具体示例:
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> task = asyncio.ensure_future(asyncio.sleep(5))
>>> task.cancel()
>>> loop.run_until_complete(task)
Traceback (most recent call last):
...
concurrent.futures._base.CancelledError
在实践中,这可能会使用以下方式实现:
class Foo:
task = None
async def sleeper(self):
self.task = asyncio.sleep(60)
try:
await self.task
except concurrent.futures.CancelledError:
raise NotImplementedError
当这个方法处于休眠状态时,其他人可以调用foo.task.cancel()
来唤醒协程并让它处理取消。或者,任何调用sleeper()
的人都可以直接取消它,而不给它清理的机会。
设置超时同样简单:
>>> loop.run_until_complete(asyncio.wait_for(asyncio.sleep(60), 5))
[ ... 5 seconds later ... ]
Traceback (most recent call last):
...
concurrent.futures._base.TimeoutError
特别是在 HTTP 请求超时的情况下,请参阅aiohttp:
async def fetch_page(session, url):
with aiohttp.Timeout(10):
async with session.get(url) as response:
assert response.status == 200
return await response.read()
with aiohttp.ClientSession(loop=loop) as session:
content = loop.run_until_complete(fetch_page(session, 'http://python.org'))
显然,对fetch_page
的每次调用都可以决定自己的aiohttp.Timeout
值,并且每个单独的实例都会在达到该超时时抛出自己的异常。
【讨论】:
哇,很好的答案。谢谢你。想象一下,我使用 subprocess 模块(我读到它受支持),然后我想以某种方式终止子进程。你有提示如何做到这一点? 不是,不是。您应该为此提出一个新问题。 @guettli 看看asyncio.subprocess 模块和那些examples。【参考方案2】:您可以在其异常中立即引发(使用asyncio.CancelledError
)。
我用这个方法来解决这个问题:
import asyncio
async def worker():
try:
# await for some coroutine process
except asyncio.CancelledError:
# Do stuff
raise asyncio.CancelledError()
except Exception as exc:
# Do stuff
print(exc)
finally:
await asyncio.sleep(2)
【讨论】:
以上是关于从 concurrent.futures 到 asyncio的主要内容,如果未能解决你的问题,请参考以下文章
从 concurrent.futures 到 asyncio
python concurrent.futures.ProcessPoolExecutor的示例。使用“as_completed”获得无序结果的关键思路之一。如果是序列顺序
python concurrent.futures.ProcessPoolExecutor的示例。使用“as_completed”获得无序结果的关键思路之一。如果是序列顺序
python concurrent.futures.ProcessPoolExecutor的示例。使用“as_completed”获得无序结果的关键思路之一。如果是序列顺序
从 concurrent.futures 使用 ThreadPoolExecutor 时的 max_workers 数?
python concurrent.futures.ProcessPoolExecutor:.submit() 与 .map() 的性能