用 asyncio.run 替换 asyncio.get_event_loop().run_until_complete

Posted

技术标签:

【中文标题】用 asyncio.run 替换 asyncio.get_event_loop().run_until_complete【英文标题】:Replacing asyncio.get_event_loop().run_until_complete with ayncio.run 【发布时间】:2021-12-24 11:45:21 【问题描述】:

我正在尝试运行these asyncronyous tests,我通过在此库的根目录node run-tests gateio --python-async 中从命令行运行它们来运行它们

下面的代码会导致弃用警告,因为我正在使用依赖于 python3.10 的东西 (tox),看起来 get_event_loop 已被 python3.10 弃用

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

弃用警告

[100%] Testing gateio WARN (testExchange @ run-tests.js:172)

WARN gateio (Python 3 Async): (explain @ run-tests.js:191)

    /path/ccxt/python/ccxt/test/test_async.py:558: DeprecationWarning: There is no current event loop
      asyncio.get_event_loop().run_until_complete(main())
    gateio using proxy ``
    EXCHANGE: gateio
    SYMBOL: BTC/USD
    CODE: ZRX
    gateio BTC/USD ticker None high: 65609.0 low: 64073.0 bid: 64958.24 ask: 64991.5 volume: 40525928.149905
    gateio fetched all 2176 tickers
    gateio fetched 10 OHLCVs
    gateio BTC/USD order book 2021-11-12T03:25:31.759Z bid: 64958.24 bidVolume: 0.3225 ask: 64991.5 askVolume: 0.3225
    gateio BTC/USD fetched 100 trades
    gateio BTC/USD fetch_orders() not supported
    gateio BTC/USD fetched 0 open orders
    gateio BTC/USD fetched 0 closed orders
    gateio ZRX fetch_transactions() not supported
    gateio fetched balance (explain @ run-tests.js:193)
WARN ["gateio"] (run-tests.js:272)
All done, 1 warnings (run-tests.js:276)

我尝试更新这一行,改用asyncio.run,但后来我得到一个无限循环,最终测试超时并失败

asyncio.run(main(), debug=True)

超时错误

[100%] Testing gateio FAIL (testExchange @ run-tests.js:172)

FAILED gateio (Python 3 Async): (explain @ run-tests.js:190)

    timed out (explain @ run-tests.js:193)
FAIL ["gateio"] (run-tests.js:271)
All done, 1 failed (run-tests.js:276)

我想知道如何正确更新此方法,以免收到弃用警告,也不会超时

【问题讨论】:

【参考方案1】:

这里的问题在于如何管理事件循环。 asyncio.run 总是在内部创建一个全新的事件循环。您正在使用的 API 似乎在 Exchange 类中使用了 asyncio.get_event_loop(),该类创建了一个与 asyncio.run 创建和传递的事件循环不同的事件循环。这会导致问题,因为Exchange 类创建的循环只是闲置在那里,因此尝试使用该循环运行任何东西都不会真正运行。要解决此问题,您需要确保使用asyncio.run 创建的循环,这意味着稍微重构代码并使用asyncio.get_running_loop()asyncio.get_running_loop() 将返回当前正在运行的循环,如果没有运行则抛出异常,它永远不会创建新的。

专门解决您遇到的问题(请记住,此更改可能会破坏代码的其他区域)

exchange.pyExchange 类的构造函数更改为使用asyncio.get_running_loop() 而不是asyncio.get_event_loop(),如下所示:

class Exchange(BaseExchange):

    def __init__(self, config=):
        if 'asyncio_loop' in config:
            self.asyncio_loop = config['asyncio_loop']
        self.asyncio_loop = asyncio.get_running_loop() #this is the change here, note that this overwrites the loop from the config, so this could cause problems.
        self.aiohttp_trust_env = config.get('aiohttp_trust_env', self.aiohttp_trust_env)
        self.verify = config.get('verify', self.verify)
        self.own_session = 'session' not in config
        self.cafile = config.get('cafile', certifi.where())
        super(Exchange, self).__init__(config)
        self.throttle = None
        self.init_rest_rate_limiter()
        self.markets_loading = None
        self.reloading_markets = False

接下来,您需要将交换初始化代码移动到 main 协程中。这样当构造函数调用get_running_loop 时,它将返回asyncio.run 创建的循环:

async def main():

  # instantiate all exchanges
  for id in ccxt.exchanges:
    if id == 'theocean':
      continue
    exchange = getattr(ccxt, id)
    exchange_config = 'verbose': argv.verbose
    if sys.version_info[0] < 3:
      exchange_config.update('enableRateLimit': True)
    if id in config:
      exchange_config = ccxt.Exchange.deep_extend(exchange_config, config[id])
    exchanges[id] = exchange(exchange_config)
    #other code as before...

此修复有效,但请记住,API 本身可能需要以支持其处理事件循环的方式的方式进行重构,以支持asyncio.run 创建和管理事件循环的方式。特别是,Exchange 类中可以从配置中提取事件循环的代码可能需要重构。

【讨论】:

“特别是,Exchange 类中可以从配置中提取事件循环的代码可能需要重构。” - 是的,那部分是个问题,我得到&lt;class 'RuntimeError'&gt; no running event loop 因为没有运行循环,应该如何重构它?

以上是关于用 asyncio.run 替换 asyncio.get_event_loop().run_until_complete的主要内容,如果未能解决你的问题,请参考以下文章

无法从正在运行的事件循环中调用 asyncio.run()

`asyncio.run()` 不等待协程完成

Django — async_to_sync 与 asyncio.run

asyncio.run_in_executor 是多线程的吗?

Python asyncio run_coroutine_threadsafe 从不运行协程?

python-asyncio