Python Asyncio - RuntimeError:无法关闭正在运行的事件循环

Posted

技术标签:

【中文标题】Python Asyncio - RuntimeError:无法关闭正在运行的事件循环【英文标题】:Python Asyncio - RuntimeError: Cannot close a running event loop 【发布时间】:2018-08-15 16:05:07 【问题描述】:

我正在尝试解决此错误:RuntimeError: Cannot close a running event loop 在我的异步进程中。我相信它正在发生,因为在任务仍然挂起时出现故障,然后我尝试关闭事件循环。我想我需要在关闭事件循环之前等待剩余的响应,但我不确定如何在我的具体情况下正确完成。

 def start_job(self):

        if self.auth_expire_timestamp < get_timestamp():
            api_obj = api_handler.Api('Api Name', self.dbObj)
            self.api_auth_resp = api_obj.get_auth_response()
            self.api_attr = api_obj.get_attributes()


        try:
            self.queue_manager(self.do_stuff(json_data))
        except aiohttp.ServerDisconnectedError as e:
            logging.info("Reconnecting...")
            api_obj = api_handler.Api('API Name', self.dbObj)
            self.api_auth_resp = api_obj.get_auth_response()
            self.api_attr = api_obj.get_attributes()
            self.run_eligibility()

async def do_stuff(self, data):

    tasks = []

    async with aiohttp.ClientSession() as session:
        for row in data:
            task = asyncio.ensure_future(self.async_post('url', session, row))
            tasks.append(task)
        result = await asyncio.gather(*tasks)
    self.load_results(result)


def queue_manager(self, method):
    self.loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(method)
    self.loop.run_until_complete(future)


async def async_post(self, resource, session, data):
        async with session.post(self.api_attr.api_endpoint + resource, headers=self.headers, data=data) as response:
            resp = []
            try:
                headers = response.headers['foo']
                content = await response.read()
                resp.append(headers)
                resp.append(content)
            except KeyError as e:
                logging.error('KeyError at async_post response')
                logging.error(e)
        return resp


def shutdown(self):
    //need to do something here to await the remaining tasks and then I need to re-start a new event loop, which i think i can do, just don't know how to appropriately stop the current one.
    self.loop.close() 
    return True

我如何处理错误并正确关闭事件循环,以便我可以启动一个新的并基本上重新启动整个程序并继续。

编辑:

这就是我现在正在尝试的,基于this SO answer。不幸的是,这个错误很少发生,所以除非我能强制它,否则我将不得不等待,看看它是否有效。在我的queue_manager 方法中,我将其更改为:

try:
   self.loop.run_until_complete(future)
except Exception as e:
   future.cancel()
   self.loop.run_until_complete(future)
   future.exception()

更新:

我摆脱了shutdown() 方法并将其添加到我的queue_manager() 方法中,它似乎可以正常工作:

  try:
        self.loop.run_until_complete(future)
    except Exception as e:
        future.cancel()
        self.check_in_records()
        self.reconnect()
        self.start_job()
        future.exception()

【问题讨论】:

shutdown 是从哪里调用的,为什么要尝试close 事件循环?一个 asyncio 程序通常由一个事件循环实例在其整个生命周期内提供服务。 问题是我调用的 API 将与待处理的任务断开连接,我试图“重新启动”而不会使整个应用程序崩溃。我为我最近添加的内容添加了更新。它似乎有效,但我愿意接受反馈。 最后需要future.exception()吗? run_until_complete 似乎正确地发现了异常。另外,取消显然已经完成的未来有什么意义(run_until_complete raise 见证了这一点)? 这个答案 - ***.com/a/30766124/4113027 似乎表明我需要 future.exception() 位。至于你的另一个问题,我正在取消未来,因为在这种情况下还有剩余的任务待处理,我想基本上删除这些任务并开始一个新的事件循环。在这种情况下,服务器在所有任务完成之前就断开了连接,因此那些剩余的任务无论如何都不会带回任何数据......也许我没有正确处理......? 在您的代码中,cancel()exception() 都是不必要的。 cancel() 因为未来已经完成(有一个例外),run_until_complete 退出的事实证明了这一点。请参阅this code 以获取功能等效的示例,该示例在没有警告的情况下运行。 【参考方案1】:

要回答最初所说的问题,不需要close()一个正在运行的循环,您可以为整个程序重复使用相同的循环。

鉴于更新中的代码,您的 queue_manager 可能如下所示:

try:
    self.loop.run_until_complete(future)
except Exception as e:
    self.check_in_records()
    self.reconnect()
    self.start_job()

取消future 是不必要的,据我所知没有任何效果。这与referenced answer 不同,后者专门对KeyboardInterrupt 做出反应,特别是因为它是由 asyncio 本身引发的。 KeyboardInterrupt 可以通过 run_until_complete 传播,而未来尚未实际完成。在 asyncio 中正确处理 Ctrl-C 非常困难甚至是不可能的(详情请参阅here),但幸运的是,问题根本与 Ctrl-C 无关,它是关于协程引发的异常。 (注意KeyboardInterrupt 不继承自Exception,所以在 Ctrl-C 的情况下,except 正文甚至不会执行。)

我取消了未来,因为在这种情况下还有剩余的任务待处理,我想从根本上删除这些任务并开始一个新的事件循环。

这是一件正确的事情,但(更新的)问题中的代码只是取消了一个未来,这个未来已经传递给run_until_complete。回想一下,future 是稍后提供的结果值的占位符。一旦提供了值,就可以通过调用future.result() 来检索它。如果未来的“价值”是异常,future.result() 将引发该异常。 run_until_complete 有一个约定,只要给定的未来产生一个值,它将运行事件循环,然后返回该值。如果“值”实际上是要引发的异常,则run_until_complete 将重新引发它。例如:

loop = asyncio.get_event_loop()
fut = loop.create_future()
loop.call_soon(fut.set_exception, ZeroDivisionError)
# raises ZeroDivisionError, as that is the future's result,
# manually set
loop.run_until_complete(fut)

当讨论的未来实际上是一个Task,一个将协程包装到Future 中的特定于异步的对象,这种未来的结果是协程返回的对象。如果协程引发异常,则检索结果将重新引发异常,run_until_complete 也是如此:

async def fail():
    1/0

loop = asyncio.get_event_loop()
fut = loop.create_task(fail())
# raises ZeroDivisionError, as that is the future's result,
# because the coroutine raises it
loop.run_until_complete(fut)

在处理任务时,run_until_completefinishing 表示协程也已完成,返回值或引发异常,由 run_until_complete 返回或引发。

另一方面,取消任务的工作方式是安排要恢复的任务和暂停它的await 表达式以引发CancelledError。除非任务专门捕获并抑制此异常(行为良好的 asyncio 代码不应该这样做),否则任务将停止执行,CancelledError 将成为其结果。但是,如果在调用cancel() 时协程已经完成,那么cancel() 将无法执行任何操作,因为没有待处理的await 可以注入CancelledError

【讨论】:

以上是关于Python Asyncio - RuntimeError:无法关闭正在运行的事件循环的主要内容,如果未能解决你的问题,请参考以下文章

python:asyncio模块

Python asyncio 模块

python asyncio

Python标准模块--asyncio

Python学习---Python的异步---asyncio模块(no-http)

python协程(4):asyncio