使用 FastAPI 的异步 SqlAlchemy:获取所有请求的单个会话

Posted

技术标签:

【中文标题】使用 FastAPI 的异步 SqlAlchemy:获取所有请求的单个会话【英文标题】:Async SqlAlchemy with FastAPI: Getting single session for all requests 【发布时间】:2021-12-01 11:32:21 【问题描述】:

我最近将使用 FastApi 编码的 REST API 迁移到新的 SQLAlchemy 1.4+ 异步版本。 我的应用程序编译正确,数据库似乎设置得很好。当我尝试执行多个请求时出现问题,出现一个错误,似乎表明同一会话正在用于我的所有请求。我已将错误消息放在末尾​​p>

这是我的代码,我基于 SQLAlchemy Async Docs 和 this example

App Engine 初始化

from typing import AsyncIterator
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

from .notification import Notification
from .devices import Device



from sqlalchemy import MetaData
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import create_engine

from app.core.config import Config
from app.core.constants import logger
import asyncio

engine = create_async_engine(Config.RDS_DB_URL)
metadata = MetaData(engine)

#if not engine.dialect.has_table(engine, C.NOTIFICATIONS):
#Base.metadata.create_all(engine)

async def init_connection():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    

asyncio.run(init_connection())

async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)

async def get_session() -> AsyncIterator[AsyncSession]:
    async with async_session() as session:
        yield session




使用会话的端点取决于加载

@registry_router.get(R.REGISTRY_PRESCRIPTION_ID, tags=[TAGS.REGISTRY])
async def get_patient_prescription_by_id(id: str, x_authorization: str = Header(None), 
                session: AsyncSession = Depends(get_session)):
    #Other code
    return (await session.execute(select(Prescription).where(Prescription.id==id, 
                Prescription.customerId==customerId))).scalars().first()

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/hypercorn/asyncio/context.py", line 28, in _handle
    await invoke_asgi(app, scope, receive, send)
  File "/usr/local/lib/python3.7/dist-packages/hypercorn/utils.py", line 219, in invoke_asgi
    await app(scope, receive, send)
  File "/usr/local/lib/python3.7/dist-packages/fastapi/applications.py", line 208, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.7/dist-packages/starlette/applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.7/dist-packages/starlette/middleware/errors.py", line 181, in __call__
    raise exc
  File "/usr/local/lib/python3.7/dist-packages/starlette/middleware/errors.py", line 159, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.7/dist-packages/starlette/middleware/cors.py", line 84, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.7/dist-packages/starlette/exceptions.py", line 82, in __call__
    raise exc
  File "/usr/local/lib/python3.7/dist-packages/starlette/exceptions.py", line 71, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.7/dist-packages/starlette/routing.py", line 656, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.7/dist-packages/starlette/routing.py", line 259, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.7/dist-packages/starlette/routing.py", line 61, in app
    response = await func(request)
  File "/usr/local/lib/python3.7/dist-packages/fastapi/routing.py", line 227, in app
    dependant=dependant, values=values, is_coroutine=is_coroutine
  File "/usr/local/lib/python3.7/dist-packages/fastapi/routing.py", line 159, in run_endpoint_function
    return await dependant.call(**values)
  File "/home/ubuntu/rest/app/api/registry.py", line 504, in get_entity_schedules
    for row in (await data).items:
  File "/usr/local/lib/python3.7/dist-packages/fastapi_pagination/ext/async_sqlalchemy.py", line 23, in paginate
    total = await session.scalar(select(func.count()).select_from(query.subquery()))  # type: ignore
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/ext/asyncio/session.py", line 230, in scalar
    **kw
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/ext/asyncio/session.py", line 206, in execute
    **kw
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 128, in greenlet_spawn
    result = context.throw(*sys.exc_info())
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/session.py", line 1689, in execute
    result = conn._execute_20(statement, params or , execution_options)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1611, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/sql/elements.py", line 324, in _execute_on_connection
    self, multiparams, params, execution_options
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1488, in _execute_clauseelement
    cache_hit=cache_hit,
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1843, in _execute_context
    e, statement, parameters, cursor, context
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 2024, in _handle_dbapi_exception
    sqlalchemy_exception, with_traceback=exc_info[2], from_=e
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1800, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line 717, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 451, in execute
    self._prepare_and_execute(operation, parameters)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 70, in await_only
    return current.driver.switch(awaitable)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 123, in greenlet_spawn
    value = await result
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 379, in _prepare_and_execute
    await adapt_connection._start_transaction()
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 687, in _start_transaction
    self._handle_exception(error)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 653, in _handle_exception
    raise translated_error from error
sqlalchemy.exc.InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions._base.InterfaceError'>: cannot perform operation: another operation is in progress
[SQL: SELECT count(*) AS count_1
FROM (SELECT schedules."profileId" AS "profileId", schedules."customerId" AS "customerId", schedules."practitionerId" AS "practitionerId", schedules.practitioner AS practitioner, schedules.patient AS patient, schedules.id AS id, schedules.reason AS reason, schedules.comments AS comments, schedules.state AS state, schedules.duration AS duration, schedules.time AS time, schedules.ts AS ts, schedules.responses AS responses, schedules."attentionType" AS "attentionType", schedules.history AS history, tb_user.specialty AS specialty
FROM schedules LEFT OUTER JOIN tb_user ON CAST(tb_user.id AS VARCHAR) = schedules."practitionerId"
WHERE (schedules."practitionerId" = %s OR (EXISTS (SELECT 1
FROM participants, tb_user
WHERE schedules.id = participants."scheduleId" AND tb_user.id = participants."participantId" AND tb_user.id = %s))) AND schedules."customerId" = %s AND schedules.ts BETWEEN %s AND %s ORDER BY schedules.ts DESC) AS anon_1]
[parameters: ('c42a1400-4534-11eb-8918-eb0d5241f5a7', 'c42a1400-4534-11eb-8918-eb0d5241f5a7', '8f69db20-4533-11eb-8918-eb0d5241f5a7', 1632970800000, 1635735599000)]
(Background on this error at: https://sqlalche.me/e/14/rvf5)

我尝试以各种方式让会话在每个请求上运行,但均未成功。关于我缺少什么的任何线索?

【问题讨论】:

我无法重现这个。 【参考方案1】:

回答晚了,但可能对其他人有所帮助

您必须在生成器函数中创建一个会话变量,如下所示

async def get_session() -> AsyncIterator[AsyncSession]:
    async_session = sessionmaker(
         engine, 
         expire_on_commit=False, 
         class_=AsyncSession
         )
    async with async_session() as session:
        yield session

这样每次调用 get_session() 都会创建一个新会话

您也可以参考以下文章以获得更好的理解和设置

https://testdriven.io/blog/fastapi-sqlmodel/

【讨论】:

以上是关于使用 FastAPI 的异步 SqlAlchemy:获取所有请求的单个会话的主要内容,如果未能解决你的问题,请参考以下文章

如何为 FastAPI 应用程序编写 SQLAlchemy 测试夹具

只需格式化 FastApi 端点返回的 SQLAlchemy 模型

FastAPI 中的会话

Python FastAPI 框架 操作Mysql数据库 增删改查

fastapi异步web框架入门

FastAPI 1:安装FastAPI