阻塞的 Python 异步函数调用也会阻塞另一个异步函数

Posted

技术标签:

【中文标题】阻塞的 Python 异步函数调用也会阻塞另一个异步函数【英文标题】:A blocked Python async function invocation also block another async function 【发布时间】:2020-08-19 02:52:10 【问题描述】:

我使用 FastAPI 开发访问 SQL Server 的数据层 API。 无论使用 pytds 还是 pyodbc, 如果有数据库事务导致任何请求挂起, 所有其他请求都将被阻止。 (即使没有数据库操作)

转载:

    有意执行可序列化的 SQL Server 会话,开始事务并且不回滚或提交
    INSERT INTO [dbo].[KVStore] VALUES ('1', '1', 0)
    begin tran
    SET TRANSACTION ISOLATION LEVEL Serializable
    SELECT * FROM [dbo].[KVStore]
    使用异步处理函数向 API 发送请求,如下所示:
    def kv_delete_by_key_2_sql():
            conn = pytds.connect(dsn='192.168.0.1', database=cfg.kvStore_db, user=cfg.kvStore_uid,
                                 password=cfg.kvStore_upwd, port=1435, autocommit=True)
            engine = conn.cursor()
            try:
                sql = "delete KVStore; commit"

                with concurrent.futures.ThreadPoolExecutor() as executor:
                    future = executor.submit(engine.execute, sql)
                    rs = future.result()
                    j = 
                        'success': True,
                        'rowcount': rs.rowcount
                    
                    return jsonable_encoder(j)
            except Exception as exn:
                j = 
                    'success': False,
                    'reason': exn_handle(exn)
                
                return jsonable_encoder(j)

    @app.post("/kvStore/delete")
    async def kv_delete(request: Request, type_: Optional[str] = Query(None, max_length=50)):
        request_data = await request.json()

        return kv_delete_by_key_2_sql()

    然后使用异步处理函数向同一应用的 API 发送请求,如下所示:

    async def hangit0(request: Request, t: int = Query(0)):
        print(t, datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
        await asyncio.sleep(t)
        print(t, datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
        j = 
            'success': True
        
        return jsonable_encoder(j)
    
    @app.get("/kvStore/hangit/")
    async def hangit(request: Request, t: int = Query(0)):
        return await hangit0(request, t)

我预计 step.2 会挂起,而 step.3 应该在 2 秒后直接返回。 但是,如果事务未提交或回滚,则 step.3 永远不会返回...

如何使这些处理函数同时工作?

【问题讨论】:

请解释清楚。代码不连贯,即调用与函数名不匹配 我更新了函数名。问题是即使 FastAPI 使用 asyncio,我也在线程池中包装执行。 step.2 中的处理程序仍然阻塞 step.3 中的处理程序。但是,step.3 从不涉及任何数据库操作... async 用于合作并发。使用阻塞的常规函数​​是不合作的。所以简单的答案是“不要使用阻塞函数”。您到底在寻找什么? 这能回答你的问题吗? Difference between asyncio.wait([asyncio.sleep(5)]) and asyncio.sleep(5) 不...它与睡眠无关,我已经使用asyncio.sleep。问题是数据库锁导致异步函数挂起,又阻塞了另一个异步请求……这不是我的预期…… 【参考方案1】:

原因是rs = future.result() 实际上是一个阻塞调用——参见python docs。不幸的是,executor.submit() 不会返回等待对象(concurrent.futures.Futureasyncio.Future 不同。

您可以使用asyncio.wrap_future,它接受concurrent.futures.Future 并返回asyncio.Future(请参阅python docs)。新的 Future 对象是可等待的,因此您可以将阻塞函数转换为异步函数。

一个例子

import asyncio
import concurrent.futures

async def my_async():
    with concurrent.futures.ThreadPoolExecutor() as executor:
        future = executor.submit(lambda x: x + 1, 1)
        return await asyncio.wrap_future(future)

print(asyncio.run(my_async()))

在您的代码中,只需将rs = future.result() 更改为rs = await asyncio.wrap_future(future) 并生成整个函数async。那应该有魔力,祝你好运! :)

【讨论】:

以上是关于阻塞的 Python 异步函数调用也会阻塞另一个异步函数的主要内容,如果未能解决你的问题,请参考以下文章

Python 中的协程 无阻塞

python 37 同步异步调用

关于同步,异步,阻塞,非阻塞的简单介绍

signal之——异步回收机制

关于python中同步异步,阻塞非阻塞的理解

Java中的非阻塞异步IO