从 concurrent.futures 到 asyncio

Posted

技术标签:

【中文标题】从 concurrent.futures 到 asyncio【英文标题】:From concurrent.futures to asyncio 【发布时间】:2016-12-03 19:36:25 【问题描述】:

concurrent.futures 有两个问题:

如何在 python concurrent.futures 中打破 time.sleep()?

结论:time.sleep() 不能被中断。一种解决方案是:您可以围绕它编写一个循环并进行短暂的睡眠。

见How to break time.sleep() in a python concurrent.futures

concurrent.futures 的个别超时?

结论:个别超时需要用户自己实现。例如:对于每次超时,您可以致电wait()。

见Individual timeouts for concurrent.futures

问题

asyncio 能解决这些问题吗?

【问题讨论】:

为了自我封闭,你能在这里总结一下其他两个问题吗? 【参考方案1】:

在异步模型中,执行由事件循环调度和协调。要取消当前暂停任务的执行,您基本上只需要不恢复它。虽然这在实践中有点不同,但很明显这使得取消暂停的任务在理论上很简单。

单独的超时当然是可能的:每当您暂停协程以等待结果时,您都想提供一个超时值。事件循环将确保在达到超时且任务尚未完成时取消等待任务。

一些具体示例:

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> task = asyncio.ensure_future(asyncio.sleep(5))
>>> task.cancel()
>>> loop.run_until_complete(task)
Traceback (most recent call last):
   ...
concurrent.futures._base.CancelledError

在实践中,这可能会使用以下方式实现:

class Foo:
    task = None

    async def sleeper(self):
        self.task = asyncio.sleep(60)
        try:
            await self.task
        except concurrent.futures.CancelledError:
            raise NotImplementedError

当这个方法处于休眠状态时,其他人可以调用foo.task.cancel() 来唤醒协程并让它处理取消。或者,任何调用sleeper() 的人都可以直接取消,而不给它清理的机会。

设置超时同样简单:

>>> loop.run_until_complete(asyncio.wait_for(asyncio.sleep(60), 5))
[ ... 5 seconds later ... ]
Traceback (most recent call last):
   ...
concurrent.futures._base.TimeoutError

特别是在 HTTP 请求超时的情况下,请参阅aiohttp:

async def fetch_page(session, url):
    with aiohttp.Timeout(10):
        async with session.get(url) as response:
            assert response.status == 200
            return await response.read()

with aiohttp.ClientSession(loop=loop) as session:
    content = loop.run_until_complete(fetch_page(session, 'http://python.org'))

显然,对fetch_page 的每次调用都可以决定自己的aiohttp.Timeout 值,并且每个单独的实例都会在达到该超时时抛出自己的异常。

【讨论】:

哇,很好的答案。谢谢你。想象一下,我使用 subprocess 模块(我读到它受支持),然后我想以某种方式终止子进程。你有提示如何做到这一点? 不是,不是。您应该为此提出一个新问题。 @guettli 看看asyncio.subprocess 模块和那些examples。【参考方案2】:

您可以在其异常中立即引发(使用asyncio.CancelledError)。

我用这个方法来解决这个问题:

import asyncio

async def worker():
    try:
        # await for some coroutine process
    except asyncio.CancelledError:
        # Do stuff
        raise asyncio.CancelledError()
    except Exception as exc:
        # Do stuff
        print(exc)
    finally:
        await asyncio.sleep(2)

【讨论】:

以上是关于从 concurrent.futures 到 asyncio的主要内容,如果未能解决你的问题,请参考以下文章

从 concurrent.futures 到 asyncio

python concurrent.futures.ProcessPoolExecutor的示例。使用“as_completed”获得无序结果的关键思路之一。如果是序列顺序

python concurrent.futures.ProcessPoolExecutor的示例。使用“as_completed”获得无序结果的关键思路之一。如果是序列顺序

python concurrent.futures.ProcessPoolExecutor的示例。使用“as_completed”获得无序结果的关键思路之一。如果是序列顺序

从 concurrent.futures 使用 ThreadPoolExecutor 时的 max_workers 数?

python concurrent.futures.ProcessPoolExecutor:.submit() 与 .map() 的性能