将 asyncio.ensure_future 和 loop.run_until_complete 组合在一个 api 调用中?

Posted

技术标签:

【中文标题】将 asyncio.ensure_future 和 loop.run_until_complete 组合在一个 api 调用中?【英文标题】:Combine asyncio.ensure_future and loop.run_until_complete in a single api call? 【发布时间】:2021-01-27 14:10:49 【问题描述】:

我编写了一个异步函数,它收集多个文本数据并批量处理数据。之后,它返回输出,如下所示:

import sys
import asyncio

Model_runner():
    '''
    The model runner combines all the input coming to it and combines in a batch of 10 or 1 sec, which ever duration is less. 
    After combining, it does processing and returns the output 
    '''


loop = asyncio.get_event_loop()
model_obj = ModelRunner(loop)
loop.create_task(model_obj.model_runner())


async def process_text(text):
        out_ = await model_obj.process_input(text)
        return out_

为了获得输出,我运行以下代码:

task1 = asyncio.ensure_future(process_text(text1))
task2 = asyncio.ensure_future(process_text(text2))
task3 = asyncio.ensure_future(process_text(text3))
task4 = asyncio.ensure_future(process_text(text4))
async_tasks = [task1, task2, task3, task4]
out1, out2 ,out3 ,out4 = loop.run_until_complete(asyncio.gather(*async_tasks))

这里,out1、out2、out3、out4是处理文本数据后的输出。

在这里,我不想像[task1,task2,task3,task4]这样组合任务然后调用loop.run_until_complete来获取输出。相反,我正在寻找这样的功能:

out1 = func(text1)
out2 = func(text2) 
etc..

但是,它们应该像 asyncio.ensure_future 那样以非阻塞方式工作。我怎样才能做到这一点。提前致谢。

【问题讨论】:

您希望func(text2) 仅在func(text1) 完成后启动,等等,还是希望它们同时运行? @ArthurTacca 我希望这两个函数同时运行,因为我想从生成文本数据的线程中调用它们。我不想创建 async_tasks。 Model_runner 将处理多个输入是否同时出现。基本上,func(text) 应该作为 process_text(text) 函数的异步 api 工作。 @ArthurTacca 文本数据是通过多个线程连续生成的,每个线程应该可以同时调用process_text(text) 【参考方案1】:

两个明显的选择:

如果您已经有多个线程,为什么还要使用 asyncio?只需将 process_text 设为常规阻塞函数并从这些线程中调用它即可。 相反,如果您使用的是 asyncio,为什么还要使用多个线程呢?使您的***任务异步并在一个线程中运行它们。

如果你真的必须使用多线程和异步函数:

让一个线程运行您的异步循环和您已经提到的工作线程,并在线程中使用loop.call_soon_threadsafe 来强制异步函数在异步线程中运行。如果您想将结果返回给线程,您可以使用queue.Queue 将结果(或多个结果)发回。 最后一个选项可能是最糟糕的,几乎可以肯定不是您想要的,但我提到它是为了完整性:从每个需要它的线程启动一个单独的 asyncio 事件循环,并使用它们在工作线程中运行您的异步函数直接。

【讨论】:

感谢您的回复。实际上,我正在研究您建议的第二种方法,而不是在这里使用任何线程。我正在使用 loop.run_forever() ` ` 但是,现在我遇到了内存泄漏。由于我的处理是在 GPU 中完成的,因此我的内存在操作运行时正在增加。您能建议任何解决方法吗? ### 开始异步操作` asyncio_list = [] for i in range(cam_run): print("cam ", i, " initialised") asyncio_list.append(asyncio.ensure_future(process_text(text))) loop.run_forever() loop.close() `

以上是关于将 asyncio.ensure_future 和 loop.run_until_complete 组合在一个 api 调用中?的主要内容,如果未能解决你的问题,请参考以下文章

asyncio.new_event_loop 创建的事件循环挂起

python框架fastapi, AttributeError: module 'asyncio' has no attribute 'iscoroutinefunction&

如何获取协程的返回值

怎么将图片和文件捆绑在一起?

如何将word文本和图片一起上传

如何将背景图像添加到 UICollectionView 将滚动和缩放单元格