使用队列会导致 asyncio 异常“将 Future <Future pending> 附加到不同的循环”
Posted
技术标签:
【中文标题】使用队列会导致 asyncio 异常“将 Future <Future pending> 附加到不同的循环”【英文标题】:Using queues results in asyncio exception "got Future <Future pending> attached to a different loop" 【发布时间】:2018-12-11 12:55:13 【问题描述】:我正在尝试使用异步队列运行这个简单的代码,但会捕获异常,甚至是嵌套异常。
我想获得一些帮助以使 asyncio 中的队列正常工作:
import asyncio, logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("asyncio").setLevel(logging.WARNING)
num_workers = 1
in_queue = asyncio.Queue()
out_queue = asyncio.Queue()
tasks = []
async def run():
for request in range(1):
await in_queue.put(request)
# each task consumes from 'input_queue' and produces to 'output_queue':
for i in range(num_workers):
tasks.append(asyncio.create_task(worker(name=f'worker-i')))
# tasks.append(asyncio.create_task(saver()))
print('waiting for queues...')
await in_queue.join()
# await out_queue.join()
print('all queues done')
for task in tasks:
task.cancel()
print('waiting until all tasks cancelled')
await asyncio.gather(*tasks, return_exceptions=True)
print('done')
async def worker(name):
while True:
try:
print(f"name started")
num = await in_queue.get()
print(f'name got num')
await asyncio.sleep(0)
# await out_queue.put(num)
except Exception as e:
print(f"name exception e")
finally:
print(f"name ended")
in_queue.task_done()
async def saver():
while True:
try:
print("saver started")
num = await out_queue.get()
print(f'saver got num')
await asyncio.sleep(0)
print("saver ended")
except Exception as e:
print(f"saver exception e")
finally:
out_queue.task_done()
asyncio.run(run(), debug=True)
print('Done!')
输出:
waiting for queues...
worker-0 started
worker-0 got 0
worker-0 ended
worker-0 started
worker-0 exception
worker-0 ended
ERROR:asyncio:unhandled exception during asyncio.run() shutdown
task: <Task finished coro=<worker() done, defined at temp4.py:34> exception=ValueError('task_done() called too many times') created at Python37\lib\asyncio\tasks.py:325>
Traceback (most recent call last):
File "Python37\lib\asyncio\runners.py", line 43, in run
return loop.run_until_complete(main)
File "Python37\lib\asyncio\base_events.py", line 573, in run_until_complete
return future.result()
File "temp4.py", line 23, in run
await in_queue.join()
File "Python37\lib\asyncio\queues.py", line 216, in join
await self._finished.wait()
File "Python37\lib\asyncio\locks.py", line 293, in wait
await fut
RuntimeError: Task <Task pending coro=<run() running at temp4.py:23> cb=[_run_until_complete_cb() at Python37\lib\asyncio\base_events.py:158] created at Python37\lib\asyncio\base_events.py:552> got Future <Future pending> attached to a different loop
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "temp4.py", line 46, in worker
in_queue.task_done()
File "Python37\lib\asyncio\queues.py", line 202, in task_done
raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times
Traceback (most recent call last):
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1664, in <module>
main()
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1658, in main
globals = debugger.run(setup['file'], None, None, is_module)
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1068, in run
pydev_imports.execfile(file, globals, locals) # execute the script
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "temp4.py", line 63, in <module>
asyncio.run(run(), debug=True)
File "Python37\lib\asyncio\runners.py", line 43, in run
return loop.run_until_complete(main)
File "Python37\lib\asyncio\base_events.py", line 573, in run_until_complete
return future.result()
File "temp4.py", line 23, in run
await in_queue.join()
File "Python37\lib\asyncio\queues.py", line 216, in join
await self._finished.wait()
File "Python37\lib\asyncio\locks.py", line 293, in wait
await fut
RuntimeError: Task <Task pending coro=<run() running at temp4.py:23> cb=[_run_until_complete_cb() at Python37\lib\asyncio\base_events.py:158] created at Python37\lib\asyncio\base_events.py:552> got Future <Future pending> attached to a different loop
这是基本流程,我稍后想做的是对更多工作人员运行更多请求,每个工作人员会将号码从in_queue
移动到out_queue
,然后保护程序将打印来自out_queue
的号码.
【问题讨论】:
【参考方案1】:您的队列必须在循环内创建。您在为asyncio.run()
创建的循环之外创建了它们,因此它们使用events.get_event_loop()
。 asyncio.run()
创建一个新循环,在一个循环中为队列创建的期货不能在另一个循环中使用。
在您的*** run()
协程中创建您的队列,并将它们传递给需要它们的协程,或者如果您必须使用全局变量,则使用 contextvars.ContextVar
objects。
您还需要清理在任务中处理任务取消的方式。通过在任务中提出asyncio.CancelledError
exception 来取消任务。你可以忽略它,但如果你抓住它做清理工作,你必须重新饲养它。
您的任务代码无需重新引发即可捕获所有异常,包括 CancelledError
,因此您阻止了适当的取消。
相反,在取消期间会发生的是您调用queue.task_done()
;不要那样做,至少在你的任务被取消时不要这样做。您应该只在实际处理队列任务时调用task_done()
,但您的代码会在发生异常时调用task_done()
同时等待队列任务出现。
如果您需要使用try...finally: in_queue.task_done()
,请将其放在处理从队列接收的项目的代码块周围,并将await in_queue.get()
在该try
块之外。您不希望将实际上未收到的任务标记为已完成。
最后,当你打印异常时,你要打印它们的repr()
;由于历史原因,异常的str()
转换会产生它们的.args
值,这对CancelledError
异常没有多大帮助,因为.args
是空的。在格式化的字符串中使用e!r
,这样你就可以看到你正在捕获什么异常:
worker-0 exception CancelledError()
因此,更正后的代码,启用saver()
任务、在run()
内部创建的队列以及清理任务异常处理,将是:
import asyncio, logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("asyncio").setLevel(logging.WARNING)
num_workers = 1
async def run():
in_queue = asyncio.Queue()
out_queue = asyncio.Queue()
for request in range(1):
await in_queue.put(request)
# each task consumes from 'in_queue' and produces to 'out_queue':
tasks = []
for i in range(num_workers):
tasks.append(asyncio.create_task(
worker(in_queue, out_queue, name=f'worker-i')))
tasks.append(asyncio.create_task(saver(out_queue)))
await in_queue.join()
await out_queue.join()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
print('done')
async def worker(in_queue, out_queue, name):
print(f"name started")
try:
while True:
num = await in_queue.get()
try:
print(f'name got num')
await asyncio.sleep(0)
await out_queue.put(num)
except Exception as e:
print(f"name exception e!r")
raise
finally:
in_queue.task_done()
except asyncio.CancelledError:
print(f"name is being cancelled")
raise
finally:
print(f"name ended")
async def saver(out_queue):
print("saver started")
try:
while True:
num = await out_queue.get()
try:
print(f'saver got num')
await asyncio.sleep(0)
print("saver ended")
except Exception as e:
print(f"saver exception e!r")
raise
finally:
out_queue.task_done()
except asyncio.CancelledError:
print(f"saver is being cancelled")
raise
finally:
print(f"saver ended")
asyncio.run(run(), debug=True)
print('Done!')
打印出来
worker-0 started
worker-0 got 0
saver started
saver got 0
saver ended
done
worker-0 is being cancelled
worker-0 ended
saver is being cancelled
saver ended
Done!
如果您想使用全局变量来共享队列对象,请使用ContextVar
对象。您仍然在 run()
中创建队列,但如果您要启动多个循环,那么 contextvars
模块集成将负责保持队列分离:
from contextvars import ContextVar
# ...
in_queue = ContextVar('in_queue')
out_queue = ContextVar('out_queue')
async def run():
in_, out = asyncio.Queue(), asyncio.Queue()
in_queue.set(in_)
out_queue.set(out)
for request in range(1):
await in_.put(request)
# ...
for i in range(num_workers):
tasks.append(asyncio.create_task(worker(name=f'worker-i')))
tasks.append(asyncio.create_task(saver()))
await in_.join()
await out.join()
# ...
async def worker(name):
print(f"name started")
in_ = in_queue.get()
out = out_queue.get()
try:
while True:
num = await in_.get()
try:
# ...
await out.put(num)
# ...
finally:
in_.task_done()
# ...
async def saver():
print("saver started")
out = out_queue.get()
try:
while True:
num = await out.get()
try:
# ...
finally:
out.task_done()
# ...
【讨论】:
当我收到与@Shirkan 相同的错误但使用 asyncio.Lock 时,我偶然发现了这个有用的答案。这些也必须在同一个循环中创建(而不是在__init__
或任何其他非异步或不同循环的地方)。
@t-mart 我遇到了与 asyncio 相同的问题。锁定如何解决?
@NVS:阅读我的答案。任何需要访问活动循环的异步对象只能在有活动循环时创建,因此在从以asyncio.run()
开始的调用堆栈调用的函数中@【参考方案2】:
当将队列作为参数传递不是一个选项时,您还可以使用预先创建的事件循环显式初始化它
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
但是,在这种情况下,您将放弃 asyncio.run
方法的实用程序,并且必须自己处理事件循环的启动和关闭
try:
asyncio.set_event_loop(loop)
loop.set_debug(True)
loop.run_until_complete(run())
finally:
try:
asyncio.runners._cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
asyncio.set_event_loop(None)
loop.close()
【讨论】:
自 3.8 版起已弃用,将在 3.10 版中删除:循环参数。以上是关于使用队列会导致 asyncio 异常“将 Future <Future pending> 附加到不同的循环”的主要内容,如果未能解决你的问题,请参考以下文章
异常事件循环在 python 3.8 中使用 aiohttp 和 asyncio 关闭