concurrent.futures.Future 可以转换为 asyncio.Future 吗?
Posted
技术标签:
【中文标题】concurrent.futures.Future 可以转换为 asyncio.Future 吗?【英文标题】:Can concurrent.futures.Future be converted to asyncio.Future? 【发布时间】:2019-06-03 09:52:23 【问题描述】:在编写多线程代码多年后,我正在练习asyncio
。
注意到一些我觉得很奇怪的东西。在asyncio
和concurrent
中都有一个Future
对象。
from asyncio import Future
from concurrent.futures import Future
猜猜每个人都有自己的角色..
我的问题是我是否可以将concurrent.future.Future
转移到asyncio.Future
(或相反)?
【问题讨论】:
不确定是否完全重复,但肯定相关:***.com/questions/29902908/… 谢谢。阅读它,那里有一个比较,我还没有答案.. mkrieger1 的帖子很有帮助。我不知道你说的转移到另一个是什么意思 【参考方案1】:我的问题是我是否可以将
concurrent.future.Future
转移到asyncio.Future
(或相反)?
如果“转移”是指将一个转换为另一个,是的,这是可能的,尽管桥接阻抗不匹配可能需要一些工作。
要将concurrent.futures.Future
转换为asyncio.Future
,您可以调用asyncio.wrap_future
。返回的 asyncio 未来在 asyncio 事件循环中是可等待的,并且将在底层线程未来完成时完成。这就是run_in_executor
是implemented 的有效方式。
没有将 asyncio 未来直接转换为concurrent.futures
未来的公共功能,但有asyncio.run_coroutine_threadsafe
函数,它接受一个协程,将其提交给事件循环,然后返回一个并发的未来,它在 asyncio 未来完成时完成。这可用于有效地将任何 asyncio-awaitable 未来转换为并发未来,如下所示:
def to_concurrent(fut, loop):
async def wait():
await fut
return asyncio.run_coroutine_threadsafe(wait(), loop)
返回的未来会表现得像你对并发未来的期望,例如它的result()
方法将阻塞,等等。您可能需要注意的一件事是使用add_done_callback
添加到并发未来的回调在标记未来完成的线程中运行,在这种情况下是事件循环线程.这意味着如果你添加了一些 done 回调,你需要注意不要在它们的实现中调用阻塞调用,以免阻塞事件循环。
请注意,调用run_coroutine_threadsafe
需要事件循环在其他线程中实际运行。 (例如,您可以启动一个后台线程并让它执行loop.run_forever
。)
【讨论】:
【参考方案2】:对于“concurrent future to asyncio future”部分,这是我使用的一个实用程序。
from typing import List, Any
from concurrent.futures.thread import ThreadPoolExecutor
import asyncio
class AsyncThreadPool(ThreadPoolExecutor):
_futures: List[asyncio.Future]
_loop: asyncio.AbstractEventLoop
def __init__(self, max_workers=None):
super().__init__(max_workers)
self._futures = []
def queue(self, fn):
self._loop = asyncio.get_event_loop()
fut = self._loop.create_future()
self._futures.append(fut)
self.submit(self._entry, fn, fut)
def queueAsync(self, coroutine):
def newLoop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(coroutine)
self.queue(newLoop)
def _entry(self, fn, fut: asyncio.Future):
try:
result = fn()
self._loop.call_soon_threadsafe(fut.set_result, result)
except Exception as e:
self._loop.call_soon_threadsafe(fut.set_exception, e)
async def gather(self) -> List[Any]:
return await asyncio.gather(*self._futures)
你可以这样使用它:
with AsyncThreadPool() as pool:
# Queue some sync function (will be executed on another thread)
pool.queue(someHeavySyncFunction)
# Queue a coroutine that will be executed on a new event loop running on another thread
pool.queue(otherAsyncFunction())
# Gather results (non blocking for your current loop)
res: List[Any] = await pool.gather()
【讨论】:
既然AsyncPool
已经在事件循环中运行,那么queueAsync
只调用self._futures.append(asyncio.create_task(coroutine))
不是更有效吗?在 asyncio 中创建多个事件循环在某种程度上是一种反模式。【参考方案3】:
asyncio 中有一个名为wrap_future
的函数。
将 concurrent.futures.Future 对象包装在 asyncio.Future 对象中。
见https://docs.python.org/3/library/asyncio-future.html#asyncio.wrap_future
【讨论】:
以上是关于concurrent.futures.Future 可以转换为 asyncio.Future 吗?的主要内容,如果未能解决你的问题,请参考以下文章