阻塞的 Python 异步函数调用也会阻塞另一个异步函数
Posted
技术标签:
【中文标题】阻塞的 Python 异步函数调用也会阻塞另一个异步函数【英文标题】:A blocked Python async function invocation also block another async function 【发布时间】:2020-08-19 02:52:10 【问题描述】:我使用 FastAPI 开发访问 SQL Server 的数据层 API。 无论使用 pytds 还是 pyodbc, 如果有数据库事务导致任何请求挂起, 所有其他请求都将被阻止。 (即使没有数据库操作)
转载:
-
有意执行可序列化的 SQL Server 会话,开始事务并且不回滚或提交
INSERT INTO [dbo].[KVStore] VALUES ('1', '1', 0)
begin tran
SET TRANSACTION ISOLATION LEVEL Serializable
SELECT * FROM [dbo].[KVStore]
-
使用异步处理函数向 API 发送请求,如下所示:
def kv_delete_by_key_2_sql():
conn = pytds.connect(dsn='192.168.0.1', database=cfg.kvStore_db, user=cfg.kvStore_uid,
password=cfg.kvStore_upwd, port=1435, autocommit=True)
engine = conn.cursor()
try:
sql = "delete KVStore; commit"
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(engine.execute, sql)
rs = future.result()
j =
'success': True,
'rowcount': rs.rowcount
return jsonable_encoder(j)
except Exception as exn:
j =
'success': False,
'reason': exn_handle(exn)
return jsonable_encoder(j)
@app.post("/kvStore/delete")
async def kv_delete(request: Request, type_: Optional[str] = Query(None, max_length=50)):
request_data = await request.json()
return kv_delete_by_key_2_sql()
-
然后使用异步处理函数向同一应用的 API 发送请求,如下所示:
async def hangit0(request: Request, t: int = Query(0)):
print(t, datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
await asyncio.sleep(t)
print(t, datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
j =
'success': True
return jsonable_encoder(j)
@app.get("/kvStore/hangit/")
async def hangit(request: Request, t: int = Query(0)):
return await hangit0(request, t)
我预计 step.2 会挂起,而 step.3 应该在 2 秒后直接返回。 但是,如果事务未提交或回滚,则 step.3 永远不会返回...
如何使这些处理函数同时工作?
【问题讨论】:
请解释清楚。代码不连贯,即调用与函数名不匹配 我更新了函数名。问题是即使 FastAPI 使用 asyncio,我也在线程池中包装执行。 step.2 中的处理程序仍然阻塞 step.3 中的处理程序。但是,step.3 从不涉及任何数据库操作...async
用于合作并发。使用阻塞的常规函数是不合作的。所以简单的答案是“不要使用阻塞函数”。您到底在寻找什么?
这能回答你的问题吗? Difference between asyncio.wait([asyncio.sleep(5)])
and asyncio.sleep(5)
不...它与睡眠无关,我已经使用asyncio.sleep。问题是数据库锁导致异步函数挂起,又阻塞了另一个异步请求……这不是我的预期……
【参考方案1】:
原因是rs = future.result()
实际上是一个阻塞调用——参见python docs。不幸的是,executor.submit()
不会返回等待对象(concurrent.futures.Future
与 asyncio.Future
不同。
您可以使用asyncio.wrap_future
,它接受concurrent.futures.Future
并返回asyncio.Future
(请参阅python docs)。新的 Future
对象是可等待的,因此您可以将阻塞函数转换为异步函数。
一个例子:
import asyncio
import concurrent.futures
async def my_async():
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(lambda x: x + 1, 1)
return await asyncio.wrap_future(future)
print(asyncio.run(my_async()))
在您的代码中,只需将rs = future.result()
更改为rs = await asyncio.wrap_future(future)
并生成整个函数async
。那应该有魔力,祝你好运! :)
【讨论】:
以上是关于阻塞的 Python 异步函数调用也会阻塞另一个异步函数的主要内容,如果未能解决你的问题,请参考以下文章