concurrent.futures.Future 可以转换为 asyncio.Future 吗?

Posted

技术标签:

【中文标题】concurrent.futures.Future 可以转换为 asyncio.Future 吗?【英文标题】:Can concurrent.futures.Future be converted to asyncio.Future? 【发布时间】:2019-06-03 09:52:23 【问题描述】:

在编写多线程代码多年后,我正在练习asyncio

注意到一些我觉得很奇怪的东西。在asyncioconcurrent 中都有一个Future 对象。

from asyncio import Future
from concurrent.futures import Future

猜猜每个人都有自己的角色..

我的问题是我是否可以将concurrent.future.Future 转移到asyncio.Future(或相反)?

【问题讨论】:

不确定是否完全重复,但肯定相关:***.com/questions/29902908/… 谢谢。阅读它,那里有一个比较,我还没有答案.. mkrieger1 的帖子很有帮助。我不知道你说的转移到另一个是什么意思 【参考方案1】:

我的问题是我是否可以将concurrent.future.Future 转移到asyncio.Future(或相反)?

如果“转移”是指将一个转换为另一个,是的,这是可能的,尽管桥接阻抗不匹配可能需要一些工作。

要将concurrent.futures.Future 转换为asyncio.Future,您可以调用asyncio.wrap_future。返回的 asyncio 未来在 asyncio 事件循环中是可等待的,并且将在底层线程未来完成时完成。这就是run_in_executor 是implemented 的有效方式。

没有将 asyncio 未来直接转换为concurrent.futures 未来的公共功能,但有asyncio.run_coroutine_threadsafe 函数,它接受一个协程,将其提交给事件循环,然后返回一个并发的未来,它在 asyncio 未来完成时完成。这可用于有效地将任何 asyncio-awaitable 未来转换为并发未来,如下所示:

def to_concurrent(fut, loop):
    async def wait():
        await fut
    return asyncio.run_coroutine_threadsafe(wait(), loop)

返回的未来会表现得像你对并发未来的期望,例如它的result() 方法将阻塞,等等。您可能需要注意的一件事是使用add_done_callback 添加到并发未来的回调在标记未来完成的线程中运行,在这种情况下是事件循环线程.这意味着如果你添加了一些 done 回调,你需要注意不要在它们的实现中调用阻塞调用,以免阻塞事件循环。

请注意,调用run_coroutine_threadsafe 需要事件循环在其他线程中实际运行。 (例如,您可以启动一个后台线程并让它执行loop.run_forever。)

【讨论】:

【参考方案2】:

对于“concurrent future to asyncio future”部分,这是我使用的一个实用程序。

from typing import List, Any
from concurrent.futures.thread import ThreadPoolExecutor
import asyncio


class AsyncThreadPool(ThreadPoolExecutor):
    _futures: List[asyncio.Future]
    _loop: asyncio.AbstractEventLoop

    def __init__(self, max_workers=None):
        super().__init__(max_workers)
        self._futures = []

    def queue(self, fn):
        self._loop = asyncio.get_event_loop()
        fut = self._loop.create_future()
        self._futures.append(fut)
        self.submit(self._entry, fn, fut)

    def queueAsync(self, coroutine):
        def newLoop():
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            return loop.run_until_complete(coroutine)
        self.queue(newLoop)

    def _entry(self, fn, fut: asyncio.Future):
        try:
            result = fn()
            self._loop.call_soon_threadsafe(fut.set_result, result)
        except Exception as e:
            self._loop.call_soon_threadsafe(fut.set_exception, e)

    async def gather(self) -> List[Any]:
        return await asyncio.gather(*self._futures)

你可以这样使用它:

with AsyncThreadPool() as pool:
    # Queue some sync function (will be executed on another thread)
    pool.queue(someHeavySyncFunction)
    # Queue a coroutine that will be executed on a new event loop running on another thread
    pool.queue(otherAsyncFunction())

    # Gather results (non blocking for your current loop)
    res: List[Any] = await pool.gather()

【讨论】:

既然AsyncPool 已经在事件循环中运行,那么queueAsync 只调用self._futures.append(asyncio.create_task(coroutine)) 不是更有效吗?在 asyncio 中创建多个事件循环在某种程度上是一种反模式。【参考方案3】:

asyncio 中有一个名为wrap_future 的函数。

将 concurrent.futures.Future 对象包装在 asyncio.Future 对象中。

见https://docs.python.org/3/library/asyncio-future.html#asyncio.wrap_future

【讨论】:

以上是关于concurrent.futures.Future 可以转换为 asyncio.Future 吗?的主要内容,如果未能解决你的问题,请参考以下文章

asyncio yield from concurrent.futures.Future of an Executor

python 期物

流畅的python第十七章使用期物处理并发

3-4,协程&asyncio&异步编程补充