yield from 未与 future 一起使用
Posted
技术标签:
【中文标题】yield from 未与 future 一起使用【英文标题】:yield from wasn't used with future 【发布时间】:2019-09-06 11:10:35 【问题描述】:我正在运行以下命令,使用 requests-threads:
def perform_requests():
prepared_requests = [...]
session = AsyncSession(n=100)
results = []
async def _perform_requests():
for request in prepared_requests:
results.append(session.request(**request))
for i, result in enumerate(results):
results[i] = await asyncio.ensure_future(results[i])
session.run(_perform_requests)
return results
但是,当我运行它时,会发生一些奇怪的事情,首先我会收到大量消息,例如:
(WARNING) Connection pool is full, discarding connection:
其次,我得到了这个错误:
results[i] = await asyncio.ensure_future(results[i])
builtins.AssertionError: yield from wasn't used with future
我正在使用ensure_future()
,这是怎么回事?
【问题讨论】:
【参考方案1】:session.request()
方法返回一个扭曲的Deferred
对象(requests-threads
代码calls twisted.internet.threads.deferToThread()
)。您通常不希望将此视为异步任务,而不是在 Twisted reactor 下运行。
相反,您可以使用twisted.internet.deferred.gatherResults()
并发执行请求并收集响应。
接下来,session.run()
调用 twisted.internet.task.react()
,这将总是退出 Python:
[...] 这个函数还将:
[...]
完成后退出应用程序,退出代码 0 表示成功,1 表示失败。
(我的粗体强调)。
这意味着即使您的代码有效,也永远不会到达return results
行。
如果您将 session.run()
调用移出作为应用程序的***入口点,那么一切正常:
from requests_threads import AsyncSession
from twisted.internet import defer
session = AsyncSession(n=100)
async def perform_requests():
prepared_requests = [...]
requests = [session.request(**request) for request in prepared_requests]
responses = await defer.gatherResults(requests)
print(responses)
session.run(perform_requests)
但在打印responses
列表后会立即退出。
否则,您将不得不直接管理扭曲的反应器(使用reactor.run()
和在响应完成后调用reactor.stop()
的回调);例如:
from requests_threads import AsyncSession
from twisted.internet import defer, error, reactor
def perform_requests():
prepared_requests = [...]
session = AsyncSession(n=100)
results = []
async def gather_responses():
requests = [session.request(**request) for request in prepared_requests]
results[:] = await defer.gatherResults(requests)
try:
reactor.stop()
except error.ReactorNotRunning:
pass
deferred = defer.ensureDeferred(gather_responses())
reactor.run()
return results
print(perform_requests())
如果您需要在扭曲的反应器上运行多个任务,您可以使用单个***函数并依靠回调在响应完成时通知您。
就个人而言,我认为您最好使用aiohttp.client
module 在 Python asyncio
事件循环下运行异步请求:
import asyncio
import aiohttp
async def perform_requests():
prepared_requests = [...]
conn = aiohttp.TCPConnector(limit=100)
with aiohttp.ClientSession(connector=conn) as session:
requests = [session.request(**request) for request in prepared_requests]
responses = await asyncio.gather(*requests)
print(responses)
if __name__ == '__main__':
asyncio.run(perform_requests())
请注意,asyncio.run()
需要 Python 3.7 或更高版本;您的错误消息表明您仍在使用 3.5 或 3.6。一个work-around would be to use loop.run_until_complete()
:
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(perform_requests())
【讨论】:
以上是关于yield from 未与 future 一起使用的主要内容,如果未能解决你的问题,请参考以下文章