将 asyncio.ensure_future 和 loop.run_until_complete 组合在一个 api 调用中?
Posted
技术标签:
【中文标题】将 asyncio.ensure_future 和 loop.run_until_complete 组合在一个 api 调用中?【英文标题】:Combine asyncio.ensure_future and loop.run_until_complete in a single api call? 【发布时间】:2021-01-27 14:10:49 【问题描述】:我编写了一个异步函数,它收集多个文本数据并批量处理数据。之后,它返回输出,如下所示:
import sys
import asyncio
Model_runner():
'''
The model runner combines all the input coming to it and combines in a batch of 10 or 1 sec, which ever duration is less.
After combining, it does processing and returns the output
'''
loop = asyncio.get_event_loop()
model_obj = ModelRunner(loop)
loop.create_task(model_obj.model_runner())
async def process_text(text):
out_ = await model_obj.process_input(text)
return out_
为了获得输出,我运行以下代码:
task1 = asyncio.ensure_future(process_text(text1))
task2 = asyncio.ensure_future(process_text(text2))
task3 = asyncio.ensure_future(process_text(text3))
task4 = asyncio.ensure_future(process_text(text4))
async_tasks = [task1, task2, task3, task4]
out1, out2 ,out3 ,out4 = loop.run_until_complete(asyncio.gather(*async_tasks))
这里,out1、out2、out3、out4是处理文本数据后的输出。
在这里,我不想像[task1,task2,task3,task4]这样组合任务然后调用loop.run_until_complete来获取输出。相反,我正在寻找这样的功能:
out1 = func(text1)
out2 = func(text2)
etc..
但是,它们应该像 asyncio.ensure_future 那样以非阻塞方式工作。我怎样才能做到这一点。提前致谢。
【问题讨论】:
您希望func(text2)
仅在func(text1)
完成后启动,等等,还是希望它们同时运行?
@ArthurTacca 我希望这两个函数同时运行,因为我想从生成文本数据的线程中调用它们。我不想创建 async_tasks。 Model_runner 将处理多个输入是否同时出现。基本上,func(text) 应该作为 process_text(text) 函数的异步 api 工作。
@ArthurTacca 文本数据是通过多个线程连续生成的,每个线程应该可以同时调用process_text(text)
。
【参考方案1】:
两个明显的选择:
如果您已经有多个线程,为什么还要使用 asyncio?只需将process_text
设为常规阻塞函数并从这些线程中调用它即可。
相反,如果您使用的是 asyncio,为什么还要使用多个线程呢?使您的***任务异步并在一个线程中运行它们。
如果你真的必须使用多线程和异步函数:
让一个线程运行您的异步循环和您已经提到的工作线程,并在线程中使用loop.call_soon_threadsafe
来强制异步函数在异步线程中运行。如果您想将结果返回给线程,您可以使用queue.Queue
将结果(或多个结果)发回。
最后一个选项可能是最糟糕的,几乎可以肯定不是您想要的,但我提到它是为了完整性:从每个需要它的线程启动一个单独的 asyncio 事件循环,并使用它们在工作线程中运行您的异步函数直接。
【讨论】:
感谢您的回复。实际上,我正在研究您建议的第二种方法,而不是在这里使用任何线程。我正在使用 loop.run_forever() ` ` 但是,现在我遇到了内存泄漏。由于我的处理是在 GPU 中完成的,因此我的内存在操作运行时正在增加。您能建议任何解决方法吗? ### 开始异步操作` asyncio_list = [] for i in range(cam_run): print("cam ", i, " initialised") asyncio_list.append(asyncio.ensure_future(process_text(text))) loop.run_forever() loop.close() `以上是关于将 asyncio.ensure_future 和 loop.run_until_complete 组合在一个 api 调用中?的主要内容,如果未能解决你的问题,请参考以下文章
asyncio.new_event_loop 创建的事件循环挂起
python框架fastapi, AttributeError: module 'asyncio' has no attribute 'iscoroutinefunction&