如何将协程添加到正在运行的 asyncio 循环中?
Posted
技术标签:
【中文标题】如何将协程添加到正在运行的 asyncio 循环中?【英文标题】:how to add a coroutine to a running asyncio loop? 【发布时间】:2015-12-28 19:20:24 【问题描述】:如何将新的协程添加到正在运行的 asyncio 循环中? IE。一个已经在执行一组协程的程序。
我想作为一种解决方法,可以等待现有协程完成,然后初始化一个新循环(使用额外的协程)。但是有没有更好的方法?
【问题讨论】:
其实asyncio.create_task
会抛出运行时错误如果没有循环运行
【参考方案1】:
您可以使用create_task
来调度新的协程:
import asyncio
async def cor1():
...
async def cor2():
...
async def main(loop):
await asyncio.sleep(0)
t1 = loop.create_task(cor1())
await cor2()
await t1
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
【讨论】:
感谢您的努力,但据我了解,这个答案是错误的。这里main
的第一次调用创建了协程,然后循环开始。换句话说,这个例子在循环开始之前调度协程。这不是我要求的。
main
只是作为包装器存在;我只是想指出loop.create_task
的用法。 create_task
完全符合您的要求。 -- 我已经编辑了示例以明确main
在运行create_task
之前会阻塞。
循环运行时不能调用loop.run_until_complete()
这怎么可能是答案?该任务是在循环开始之前创建的。如何将任务添加到正在运行的循环中意味着启动了事件循环,然后我们希望将任务添加到循环中
t1 = loop.create_task(cor1())
行在事件循环中创建并安排任务,准备好在当前协程(在本例中为 main
)切换后立即执行例如await asyncio.sleep(0)
【参考方案2】:
要将函数添加到已经运行的事件循环中,您可以使用:
asyncio.ensure_future(my_coro())
在我的例子中,我使用多线程 (threading
) 和 asyncio
并希望将任务添加到已经运行的事件循环中。对于处于相同情况的任何其他人,请务必明确声明事件循环(因为在 Thread
中不存在事件循环)。即:
在全球范围内:
event_loop = asyncio.get_event_loop()
然后,在你的Thread
:
asyncio.ensure_future(my_coro(), loop=event_loop)
【讨论】:
要将任务添加到在不同线程(例如主线程)中运行的循环中,需要使用:asyncio.run_coroutine_threadsafe(coro, loop)
。见:docs.python.org/3/library/…
这在 Python 3.7 之前是正确的。见docs.python.org/3/library/asyncio-task.html#creating-tasks“python 3.7 中增加了create_task()。在Python 3.7 之前,可以使用低级的asyncio.ensure_future() 函数”。【参考方案3】:
您的问题非常接近“如何向正在运行的程序添加函数调用?”
什么时候需要向事件循环中添加新的协程?
让我们看一些例子。这里程序使用两个协程并行启动事件循环:
import asyncio
from random import randint
async def coro1():
res = randint(0,3)
await asyncio.sleep(res)
print('coro1 finished with output '.format(res))
return res
async def main():
await asyncio.gather(
coro1(),
coro1()
) # here we have two coroutines running parallely
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
输出:
coro1 finished with output 1
coro1 finished with output 2
[Finished in 2.2s]
您可能需要添加一些协程来获取coro1
的结果并在它准备好后立即使用它?在这种情况下,只需创建等待 coro1
的协程并使用它的返回值:
import asyncio
from random import randint
async def coro1():
res = randint(0,3)
await asyncio.sleep(res)
print('coro1 finished with output '.format(res))
return res
async def coro2():
res = await coro1()
res = res * res
await asyncio.sleep(res)
print('coro2 finished with output '.format(res))
return res
async def main():
await asyncio.gather(
coro2(),
coro2()
) # here we have two coroutines running parallely
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
输出:
coro1 finished with output 1
coro2 finished with output 1
coro1 finished with output 3
coro2 finished with output 9
[Finished in 12.2s]
将协程视为具有特定语法的常规函数。您可以启动一组函数并行执行(asyncio.gather
),您可以在第一次完成后启动下一个函数,您可以创建调用其他函数的新函数。
【讨论】:
协程同时运行,而不是并行。不太一样。 "创建等待 coro1 的协程"。哎呀,我正在尝试复杂的事情来根据先前的请求触发请求。非常感谢!【参考方案4】:这里的答案似乎都没有完全回答这个问题。通过让“父”任务为您完成任务,可以将任务添加到正在运行的事件循环中。我不确定确保父母在孩子全部完成之前不会结束的最pythonic方法是什么(假设这是您想要的行为),但这确实有效。
import asyncio
import random
async def add_event(n):
print('starting ' + str(n))
await asyncio.sleep(n)
print('ending ' + str(n))
return n
async def main(loop):
added_tasks = []
delays = list(range(5))
# shuffle to simulate unknown run times
random.shuffle(delays)
for n in delays:
print('adding ' + str(n))
task = loop.create_task(add_event(n))
added_tasks.append(task)
await asyncio.sleep(0)
print('done adding tasks')
results = await asyncio.gather(*added_tasks)
print('done running tasks')
return results
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main(loop))
print(results)
【讨论】:
这很有用,我将在找到优化/扩展时编辑示例。【参考方案5】:如果任务是将协程添加到已经在执行一些协程的循环中,那么你可以使用我的这个解决方案
import asyncio
import time
from threading import Thread
from random import randint
# first, we need a loop running in a parallel Thread
class AsyncLoopThread(Thread):
def __init__(self):
super().__init__(daemon=True)
self.loop = asyncio.new_event_loop()
def run(self):
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
# example coroutine
async def coroutine(num, sec):
await asyncio.sleep(sec)
print(f'Coro num has finished')
if __name__ == '__main__':
# init a loop in another Thread
loop_handler = AsyncLoopThread()
loop_handler.start()
# adding first 5 coros
for i in range(5):
print(f'Add Coro i to the loop')
asyncio.run_coroutine_threadsafe(coroutine(i, randint(3, 5)), loop_handler.loop)
time.sleep(3)
print('Adding 5 more coros')
# adding 5 more coros
for i in range(5, 10):
print(f'Add Coro i to the loop')
asyncio.run_coroutine_threadsafe(coroutine(i, randint(3, 5)), loop_handler.loop)
# let them all finish
time.sleep(60)
执行完这个例子后,我们会得到这样的输出:
Add Coro 0 to the loop
Add Coro 1 to the loop
Add Coro 2 to the loop
Add Coro 3 to the loop
Add Coro 4 to the loop
Coro 0 has finished
Adding 5 more coros
Add Coro 5 to the loop
Add Coro 6 to the loop
Add Coro 7 to the loop
Add Coro 8 to the loop
Add Coro 9 to the loop
Coro 1 has finished
Coro 3 has finished
Coro 2 has finished
Coro 4 has finished
Coro 9 has finished
Coro 5 has finished
Coro 7 has finished
Coro 6 has finished
Coro 8 has finished
Process finished with exit code 0
【讨论】:
我认为这是问题的确切答案。好一个 在“def run(self)”中不需要“return self.loop” @mug896 你是对的。感谢您的关注以上是关于如何将协程添加到正在运行的 asyncio 循环中?的主要内容,如果未能解决你的问题,请参考以下文章