使用队列会导致 asyncio 异常“将 Future <Future pending> 附加到不同的循环”

Posted

技术标签:

【中文标题】使用队列会导致 asyncio 异常“将 Future <Future pending> 附加到不同的循环”【英文标题】:Using queues results in asyncio exception "got Future <Future pending> attached to a different loop" 【发布时间】:2018-12-11 12:55:13 【问题描述】:

我正在尝试使用异步队列运行这个简单的代码,但会捕获异常,甚至是嵌套异常。

我想获得一些帮助以使 asyncio 中的队列正常工作:

import asyncio, logging

logging.basicConfig(level=logging.DEBUG)
logging.getLogger("asyncio").setLevel(logging.WARNING)


num_workers = 1
in_queue = asyncio.Queue()
out_queue = asyncio.Queue()
tasks = []


async def run():
    for request in range(1):
        await in_queue.put(request)

    # each task consumes from 'input_queue' and produces to 'output_queue':
    for i in range(num_workers):
        tasks.append(asyncio.create_task(worker(name=f'worker-i')))
    # tasks.append(asyncio.create_task(saver()))

    print('waiting for queues...')
    await in_queue.join()
    # await out_queue.join()
    print('all queues done')

    for task in tasks:
        task.cancel()
    print('waiting until all tasks cancelled')
    await asyncio.gather(*tasks, return_exceptions=True)
    print('done')


async def worker(name):
    while True:
        try:
            print(f"name started")
            num = await in_queue.get()
            print(f'name got num')
            await asyncio.sleep(0)
            # await out_queue.put(num)
        except Exception as e:
            print(f"name exception e")
        finally:
            print(f"name ended")
            in_queue.task_done()


async def saver():
    while True:
        try:
            print("saver started")
            num = await out_queue.get()
            print(f'saver got num')
            await asyncio.sleep(0)
            print("saver ended")
        except Exception as e:
            print(f"saver exception e")
        finally:
            out_queue.task_done()


asyncio.run(run(), debug=True)
print('Done!')

输出:

waiting for queues...
worker-0 started
worker-0 got 0
worker-0 ended
worker-0 started
worker-0 exception 
worker-0 ended
ERROR:asyncio:unhandled exception during asyncio.run() shutdown
task: <Task finished coro=<worker() done, defined at temp4.py:34> exception=ValueError('task_done() called too many times') created at Python37\lib\asyncio\tasks.py:325>
Traceback (most recent call last):
  File "Python37\lib\asyncio\runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "Python37\lib\asyncio\base_events.py", line 573, in run_until_complete
    return future.result()
  File "temp4.py", line 23, in run
    await in_queue.join()
  File "Python37\lib\asyncio\queues.py", line 216, in join
    await self._finished.wait()
  File "Python37\lib\asyncio\locks.py", line 293, in wait
    await fut
RuntimeError: Task <Task pending coro=<run() running at temp4.py:23> cb=[_run_until_complete_cb() at Python37\lib\asyncio\base_events.py:158] created at Python37\lib\asyncio\base_events.py:552> got Future <Future pending> attached to a different loop

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "temp4.py", line 46, in worker
    in_queue.task_done()
  File "Python37\lib\asyncio\queues.py", line 202, in task_done
    raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times
Traceback (most recent call last):
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1664, in <module>
    main()
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1658, in main
    globals = debugger.run(setup['file'], None, None, is_module)
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1068, in run
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "temp4.py", line 63, in <module>
    asyncio.run(run(), debug=True)
  File "Python37\lib\asyncio\runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "Python37\lib\asyncio\base_events.py", line 573, in run_until_complete
    return future.result()
  File "temp4.py", line 23, in run
    await in_queue.join()
  File "Python37\lib\asyncio\queues.py", line 216, in join
    await self._finished.wait()
  File "Python37\lib\asyncio\locks.py", line 293, in wait
    await fut
RuntimeError: Task <Task pending coro=<run() running at temp4.py:23> cb=[_run_until_complete_cb() at Python37\lib\asyncio\base_events.py:158] created at Python37\lib\asyncio\base_events.py:552> got Future <Future pending> attached to a different loop

这是基本流程,我稍后想做的是对更多工作人员运行更多请求,每个工作人员会将号码从in_queue 移动到out_queue,然后保护程序将打印来自out_queue 的号码.

【问题讨论】:

【参考方案1】:

您的队列必须在循环内创建。您在为asyncio.run() 创建的循环之外创建了它们,因此它们使用events.get_event_loop()asyncio.run() 创建一个新循环,在一个循环中为队列创建的期货不能在另一个循环中使用。

在您的*** run() 协程中创建您的队列,并将它们传递给需要它们的协程,或者如果您必须使用全局变量,则使用 contextvars.ContextVar objects。

您还需要清理在任务中处理任务取消的方式。通过在任务中提出asyncio.CancelledError exception 来取消任务。你可以忽略它,但如果你抓住它做清理工作,你必须重新饲养它。

您的任务代码无需重新引发即可捕获所有异常,包括 CancelledError,因此您阻止了适当的取消。

相反,在取消期间会发生的是您调用queue.task_done();不要那样做,至少在你的任务被取消时不要这样做。您应该只在实际处理队列任务时调用task_done(),但您的代码会在发生异常时调用task_done()同时等待队列任务出现

如果您需要使用try...finally: in_queue.task_done(),请将其放在处理从队列接收的项目的代码块周围,并将await in_queue.get() 在该try 块之外。您不希望将实际上未收到的任务标记为已完成。

最后,当你打印异常时,你要打印它们的repr();由于历史原因,异常的str() 转换会产生它们的.args 值,这对CancelledError 异常没有多大帮助,因为.args 是空的。在格式化的字符串中使用e!r,这样你就可以看到你正在捕获什么异常:

worker-0 exception CancelledError()

因此,更正后的代码,启用saver() 任务、在run() 内部创建的队列以及清理任务异常处理,将是:

import asyncio, logging

logging.basicConfig(level=logging.DEBUG)
logging.getLogger("asyncio").setLevel(logging.WARNING)


num_workers = 1


async def run():
    in_queue = asyncio.Queue()
    out_queue = asyncio.Queue()

    for request in range(1):
        await in_queue.put(request)

    # each task consumes from 'in_queue' and produces to 'out_queue':
    tasks = []
    for i in range(num_workers):
        tasks.append(asyncio.create_task(
            worker(in_queue, out_queue, name=f'worker-i')))
    tasks.append(asyncio.create_task(saver(out_queue)))

    await in_queue.join()
    await out_queue.join()

    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)

    print('done')

async def worker(in_queue, out_queue, name):
    print(f"name started")
    try:
        while True:
            num = await in_queue.get()
            try:
                print(f'name got num')
                await asyncio.sleep(0)
                await out_queue.put(num)
            except Exception as e:
                print(f"name exception e!r")
                raise
            finally:
                in_queue.task_done()
    except asyncio.CancelledError:
        print(f"name is being cancelled")
        raise
    finally:
        print(f"name ended")

async def saver(out_queue):
    print("saver started")
    try:
        while True:
            num = await out_queue.get()
            try:
                print(f'saver got num')
                await asyncio.sleep(0)
                print("saver ended")
            except Exception as e:
                print(f"saver exception e!r")
                raise
            finally:
                out_queue.task_done()
    except asyncio.CancelledError:
        print(f"saver is being cancelled")
        raise
    finally:
        print(f"saver ended")

asyncio.run(run(), debug=True)
print('Done!')

打印出来

worker-0 started
worker-0 got 0
saver started
saver got 0
saver ended
done
worker-0 is being cancelled
worker-0 ended
saver is being cancelled
saver ended
Done!

如果您想使用全局变量来共享队列对象,请使用ContextVar 对象。您仍然在 run() 中创建队列,但如果您要启动多个循环,那么 contextvars 模块集成将负责保持队列分离:

from contextvars import ContextVar
# ...

in_queue = ContextVar('in_queue')
out_queue = ContextVar('out_queue')

async def run():
    in_, out = asyncio.Queue(), asyncio.Queue()
    in_queue.set(in_)
    out_queue.set(out)

    for request in range(1):
        await in_.put(request)

    # ...

    for i in range(num_workers):
        tasks.append(asyncio.create_task(worker(name=f'worker-i')))
    tasks.append(asyncio.create_task(saver()))

    await in_.join()
    await out.join()

    # ...

async def worker(name):
    print(f"name started")
    in_ = in_queue.get()
    out = out_queue.get()
    try:
        while True:
            num = await in_.get()
            try:
                # ...
                await out.put(num)
                # ...
            finally:
                in_.task_done()
    # ...

async def saver():
    print("saver started")
    out = out_queue.get()
    try:
        while True:
            num = await out.get()
            try:
                # ...
            finally:
                out.task_done()
    # ...

【讨论】:

当我收到与@Shirkan 相同的错误但使用 asyncio.Lock 时,我偶然发现了这个有用的答案。这些也必须在同一个循环中创建(而不是在__init__ 或任何其他非异步或不同循环的地方)。 @t-mart 我遇到了与 asyncio 相同的问题。锁定如何解决? @NVS:阅读我的答案。任何需要访问活动循环的异步对象只能在有活动循环时创建,因此在从以asyncio.run()开始的调用堆栈调用的函数中@【参考方案2】:

当将队列作为参数传递不是一个选项时,您还可以使用预先创建的事件循环显式初始化它

loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)

但是,在这种情况下,您将放弃 asyncio.run 方法的实用程序,并且必须自己处理事件循环的启动和关闭

try:
    asyncio.set_event_loop(loop)
    loop.set_debug(True)
    loop.run_until_complete(run())
finally:
    try:
        asyncio.runners._cancel_all_tasks(loop)
        loop.run_until_complete(loop.shutdown_asyncgens())
    finally:
        asyncio.set_event_loop(None)
        loop.close()

【讨论】:

自 3.8 版起已弃用,将在 3.10 版中删除:循环参数。

以上是关于使用队列会导致 asyncio 异常“将 Future <Future pending> 附加到不同的循环”的主要内容,如果未能解决你的问题,请参考以下文章

异常事件循环在 python 3.8 中使用 aiohttp 和 asyncio 关闭

(Python)asyncio使用异常:This event loop is already running解决方式

python 并发专题(十四):asyncio 实战

python 并发专题(十四):asyncio 实战

asyncio队列的奇怪行为

Python 3.6 asyncio - 从未检索到任务异常 - 任务的产量不好:200