yield from 未与 future 一起使用

Posted

技术标签:

【中文标题】yield from 未与 future 一起使用【英文标题】:yield from wasn't used with future 【发布时间】:2019-09-06 11:10:35 【问题描述】:

我正在运行以下命令,使用 requests-threads:

def perform_requests():
    prepared_requests = [...]
    session = AsyncSession(n=100)
    results = []

    async def _perform_requests():
        for request in prepared_requests:
            results.append(session.request(**request))
        for i, result in enumerate(results):
            results[i] = await asyncio.ensure_future(results[i])

    session.run(_perform_requests)
    return results

但是,当我运行它时,会发生一些奇怪的事情,首先我会收到大量消息,例如:

(WARNING) Connection pool is full, discarding connection:

其次,我得到了这个错误:

results[i] = await asyncio.ensure_future(results[i])
builtins.AssertionError: yield from wasn't used with future

我正在使用ensure_future(),这是怎么回事?

【问题讨论】:

【参考方案1】:

session.request() 方法返回一个扭曲的Deferred 对象(requests-threads 代码calls twisted.internet.threads.deferToThread())。您通常希望将此视为异步任务,而不是在 Twisted reactor 下运行。

相反,您可以使用twisted.internet.deferred.gatherResults() 并发执行请求并收集响应。

接下来,session.run() 调用 twisted.internet.task.react(),这将总是退出 Python

[...] 这个函数还将:

[...]

完成后退出应用程序,退出代码 0 表示成功,1 表示失败。

(我的粗体强调)。

这意味着即使您的代码有效,也永远不会到达return results 行。

如果您将 session.run() 调用移出作为应用程序的***入口点,那么一切正常:

from requests_threads import AsyncSession
from twisted.internet import defer

session = AsyncSession(n=100)

async def perform_requests():
    prepared_requests = [...]
    requests = [session.request(**request) for request in prepared_requests]
    responses = await defer.gatherResults(requests)
    print(responses)        

session.run(perform_requests)

但在打印responses 列表后会立即退出。

否则,您将不得不直接管理扭曲的反应器(使用reactor.run() 和在响应完成后调用reactor.stop() 的回调);例如:

from requests_threads import AsyncSession
from twisted.internet import defer, error, reactor

def perform_requests():
    prepared_requests = [...]
    session = AsyncSession(n=100)
    results = []

    async def gather_responses():
        requests = [session.request(**request) for request in prepared_requests]
        results[:] = await defer.gatherResults(requests)
        try:
            reactor.stop()
        except error.ReactorNotRunning:
            pass

    deferred = defer.ensureDeferred(gather_responses())
    reactor.run()
    return results

print(perform_requests())

如果您需要在扭曲的反应器上运行多个任务,您可以使用单个***函数并依靠回调在响应完成时通知您。

就个人而言,我认为您最好使用aiohttp.client module 在 Python asyncio 事件循环下运行异步请求:

import asyncio
import aiohttp

async def perform_requests():
    prepared_requests = [...]
    conn = aiohttp.TCPConnector(limit=100)

    with aiohttp.ClientSession(connector=conn) as session:
        requests = [session.request(**request) for request in prepared_requests]
        responses = await asyncio.gather(*requests)

        print(responses)

if __name__ == '__main__':
    asyncio.run(perform_requests())

请注意,asyncio.run() 需要 Python 3.7 或更高版本;您的错误消息表明您仍在使用 3.5 或 3.6。一个work-around would be to use loop.run_until_complete()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(perform_requests())

【讨论】:

以上是关于yield from 未与 future 一起使用的主要内容,如果未能解决你的问题,请参考以下文章

asio 使用_future 而不是 yield[ec]

参数对象(API 密钥)未与 axios.create 一起发送

Auth 标头未与 GET 请求一起发送

更改的属性未与 Magical Record 一起保存

引导标签输入标签值未与表单一起提交

动态创建的索引字段未与表单一起提交