如何访问与异步 sqlalchemy 的关系?

Posted

技术标签:

【中文标题】如何访问与异步 sqlalchemy 的关系?【英文标题】:how to access relationships with async sqlalchemy? 【发布时间】:2022-01-03 08:41:00 【问题描述】:
import asyncio

from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import sessionmaker

engine = create_async_engine(
        "postgresql+asyncpg://user:pass@localhost/db",
        echo=True,
    )


# expire_on_commit=False will prevent attributes from being expired
# after commit.
async_session = sessionmaker(
    engine, expire_on_commit=False, class_=AsyncSession
)


Base = declarative_base()

class A(Base):
    __tablename__ = "a"

    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True)
    data = Column(String)
    create_date = Column(DateTime, server_default=func.now())
    bs = relationship("B")

    # required in order to access columns with server defaults
    # or SQL expression defaults, subsequent to a flush, without
    # triggering an expired load
    __mapper_args__ = "eager_defaults": True


class B(Base):
    __tablename__ = "b"
    id = Column(Integer, primary_key=True)
    a_id = Column(ForeignKey("a.id"))
    data = Column(String)
    
    
       

async with engine.begin() as conn:
    await conn.run_sync(Base.metadata.drop_all)
    await conn.run_sync(Base.metadata.create_all)


async with async_session() as session:
    async with session.begin():
        session.add_all(
            [
                A(bs=[B(), B()], data="a1"),
                A(bs=[B()], data="a2"),
            ]
        )


async with async_session() as session:
    result = await session.execute(select(A).order_by(A.id))
    a1 = result.scalars().first()

    # no issue: 
    print(a1.name, a1.data)

    # throws error:
    print(a1.bs)
    

尝试访问a1.bs 会出现此错误:

     59     current = greenlet.getcurrent()
     60     if not isinstance(current, _AsyncIoGreenlet):
---> 61         raise exc.MissingGreenlet(
     62             "greenlet_spawn has not been called; can't call await_() here. "
     63             "Was IO attempted in an unexpected place?"

MissingGreenlet: greenlet_spawn has not been called; can't call await_() here. Was IO attempted in an unexpected place? (Background on this error at: https://sqlalche.me/e/14/xd2s)


【问题讨论】:

【参考方案1】:

是这样的:

from sqlalchemy.orm import selectinload

async with async_session() as session:
    result = await session.execute(select(A).order_by(A.id)
                                            .options(selectinload(A.bs)))
    a = result.scalars().first()

    print(a.bs)

键使用selectinload method to prevent implicit IO

更新

selectinload 有几个替代品,例如joinedloadlazyload。我仍在尝试了解其中的差异。

【讨论】:

这甚至在尝试反序列化它与 pydantic 之类的同步时也有效,因为对象引用已经存在,因此不必进行异步调用。

以上是关于如何访问与异步 sqlalchemy 的关系?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 SQLAlchemy 中加载嵌套关系?

在 SQLAlchemy 中,当有多个关系时,如何访问查询中的引用属性?

如何建立允许不同类型用户在 SqlAlchemy 中注册的数据库关系

flask数据库管理

如何限制/抵消 sqlalchemy orm 关系的结果?

如何使用 SQLAlchemy 声明性语法指定关系?