python3.6 async/await 仍然与 fastAPI 同步工作
Posted
技术标签:
【中文标题】python3.6 async/await 仍然与 fastAPI 同步工作【英文标题】:python3.6 async/await still works synchronously with fastAPI 【发布时间】:2021-04-02 08:35:07 【问题描述】:我有一个发布两个请求的 fastAPI 应用程序,其中一个请求更长(如果有帮助,它们是 Elasticsearch 查询,我正在使用已经返回协程的 AsyncElasticsearch 模块)。这是我的尝试:
class my_module:
search_object = AsyncElasticsearch(url, port)
async def do_things(self):
resp1 = await search_object.search() #the longer one
print(check_resp1)
resp2 = await search_object.search() #the shorter one
print(check_resp2)
process(resp2)
process(resp1)
do_synchronous_things()
return thing
app = FastAPI()
@app.post("/")
async def service(user_input):
result = await my_module.do_things()
return results
我观察到的不是等待resp1
,当它到达check_resp1
时,它已经是一个完整的响应,就好像我根本没有使用异步一样。
我是 python async 的新手,我知道我的代码不会工作,但我不知道如何修复它。据我了解,当解释器看到await
时,它会启动函数然后继续前进,在这种情况下应该立即发布下一个请求。我如何让它做到这一点?
【问题讨论】:
我认为你在这里搞混了一些东西。当您使用await
时,字面意思 的意思是“停在这里,等到结果到来”。所以很自然,await search_object.search()
之后的一行回复是完全可用的。如果您不想等待,请不要使用await
。
使用await
和使用同步函数调用的唯一区别是await
只暂停当前函数,而不是整个世界。您的程序可以在任意数量的函数等待某事时执行其他操作。但从函数的角度来看,result = await asynchronous_thing()
和 result = synchronous_thing()
的行为完全相同。
@Tomalak 谢谢!所以我确实理解错了。就像你说的,如果await
暂停“当前函数”,是否意味着每个 I/O 绑定的任务都需要写在一个单独的函数中?它又是如何回归的呢?我想这是更低级别的,但我可以想当然地认为它会在完成后发生吗?
您可以通过使用await
对每个任务执行一次 (a = await func_a(); b = await func_b()
) 顺序执行任务,也可以并行执行任务,将任务背靠背拍摄并为组使用一次await
(@ 987654337@),其中asyncio.gather()
是一个助手,它为您提供一个等待的任务,一旦所有参数都完成,该任务就会完成。
【参考方案1】:
是的,这是正确的,协程在结果准备好之前不会继续。您可以使用asyncio.gather 并发运行任务:
import asyncio
async def task(msg):
print(f"START msg")
await asyncio.sleep(1)
print(f"END msg")
return msg
async def main():
await task("1")
await task("2")
results = await asyncio.gather(task("3"), task("4"))
print(results)
if __name__ == "__main__":
asyncio.run(main())
测试:
$ python test.py
START 1
END 1
START 2
END 2
START 3
START 4
END 3
END 4
['3', '4']
您也可以使用asyncio.as_completed 获得最早的下一个结果:
for coro in asyncio.as_completed((task("5"), task("6"))):
earliest_result = await coro
print(earliest_result)
2021 年 4 月 2 日星期五 09:25:33 UTC 更新:
asyncio.run 从 Python 3.7+ 开始可用,在以前的版本中,您必须手动创建和启动循环:
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
【讨论】:
这很有帮助,也很容易解释。非常感谢!【参考方案2】:说明
您的代码同步运行的原因是在do_things
函数中,代码执行如下:
-
安排
search_object.search()
执行
等到search_object.search()
完成并得到结果
安排search_object.search()
执行
等到search_object.search()
完成并得到结果
执行(同步)process(resp2)
执行(同步)process(resp1)
执行(同步)do_synchronous_things()
您的意图是让步骤 1 和 3 在 2 和 4 之前执行。您可以使用 unsync
库轻松完成 - 这里是 the documentation。
如何解决这个问题
from unsync import unsync
class my_module:
search_object = AsyncElasticsearch(url, port)
@unsync
async def search1():
return await search_object.search()
@unsync
async def search2(): # not sure if this is any different to search1
return await search_object.search()
async def do_things(self):
task1, task2 = self.search1(), self.search2() # schedule tasks
resp1, resp2 = task1.result(), task2.result() # wait till tasks are executed
# you might also do similar trick with process function to run process(resp2) and process(resp1) concurrently
process(resp2)
process(resp1)
do_synchronous_things() # if this does not rely on resp1 and resp2 it might also be put into separate task to make the computation quicker. To do this use @unsync(cpu_bound=True) decorator
return thing
app = FastAPI()
@app.post("/")
async def service(user_input):
result = await my_module.do_things()
return results
更多信息
如果你想了解更多关于 asyncio 和 asyncronyous 编程的知识,我推荐这个tutorial。还有一个类似的案例,您提出了一些可能的解决方案来使协同程序同时运行。
PS。显然我无法运行这段代码,所以你必须自己调试它。
【讨论】:
感谢您的回答!一个问题:使用unsync
装饰search1
和search2
与使用asyncio.gather([search1(), search2()])
运行它们有何不同?
我还看到了一种以并发方式运行同步和/或阻塞功能的替代方法,用@aiomisc.threaded
装饰它们,然后再次将任务收集在一起。我不知道它们在两个不同的库中的运行方式是否有任何区别。
据我所知,这几乎是一样的。唯一的区别是 unsync 的编写方式不必担心循环管理和任务创建,因为这些事情是隐式完成的。所以对于初学者(包括我自己)来说使用起来可能会容易得多,并且避免菜鸟的错误。
谢谢@Maciek,那么我也将使用unsync
,因为我绝对不擅长 Python 异步 :) 感谢分享库!以上是关于python3.6 async/await 仍然与 fastAPI 同步工作的主要内容,如果未能解决你的问题,请参考以下文章