SQLAlchemy Asyncio ORM 在从元数据中检索表和列时无法查询数据库

Posted

技术标签:

【中文标题】SQLAlchemy Asyncio ORM 在从元数据中检索表和列时无法查询数据库【英文标题】:SQLAlchemy Asyncio ORM Unable to Query Database When Retrieving Tables and Columns from MetaData 【发布时间】:2022-01-07 08:07:53 【问题描述】:

使用 SQLAlchemy 异步 ORM 1.4、Postgres 后端、Python 3.7

我在 SA ORM 中使用augmented Declarative Base。这些表不在models.py 中,而是通过解析包含所有表模式的 JSON 脚本直接提交到数据库。因此,我无法导入脚本顶部的模型,例如 from models import ThisTable

所以要对表进行 CRUD 操作,我首先通过反映元数据来检索它们。

以“通常”的方式,在脚本顶部导入所有表时,这样的查询会起作用:

result = await s.execute(select(func.sum(TableName.column)))
curr = result.all()

当我尝试从元数据中反映表和列对象以查询它们时,这不起作用。有很多AttributeError: 'Table' object has no attribute 'func'TypeError: 'Table' object is not callable错误。


def retrieve_table_obj(table):
    meta = MetaData()
    meta.reflect(bind=sync_engine)
    return meta.tables[table]

def retrieve_table_cols(self, table):
    table = retrieve_table_obj('users')
    return table.columns.keys()

async def reading(collection, modifications):

    table = db.retrieve_table_obj(collection)
    columns = db.retrieve_table_cols(collection)
    for c in columns:
        for f in mods['fields']:
            if c in f:
                q = select(func.sum(table.c))

result = await s.execute(q)
curr = result.all()

asyncio.run(reading("users", 'fields': ["usage", "allowance"]))

第一次必须显式检索数据库中的表和列时,如何查询它们?

【问题讨论】:

您可以使用 automap 扩展通过反射构建模型。 感谢自动地图的建议!它适用于同步引擎,但我正在使用异步引擎并且无法让 automap 使用它,即使在获取引擎连接并使用 conn.run_sync 调用函数时也是如此。您是否成功地将自动映射与异步引擎实例一起使用? 【参考方案1】:

自动映射扩展可用于自动将数据库表反映到 SQLAlchemy 模型。但是 automap 在引擎上使用inspect,这在异步引擎上不受支持;我们可以通过使用同步引擎进行自动映射来解决这个问题。一旦模型被映射,异步引擎就可以使用它们。

例如:

import asyncio

import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.automap import automap_base


sync_engine = sa.create_engine('postgresql:///test', echo=True, future=True)

Base = automap_base()
Base.prepare(sync_engine, reflect=True)


async def async_main(collection, modifications):
    engine = create_async_engine(
        "postgresql+asyncpg:///test",
        echo=True,
        future=True,
        connect_args='ssl': False,
    )

    async_session = orm.sessionmaker(
        engine, class_=AsyncSession, future=True
    )

    async with async_session() as session:
        model = Base.classes[collection]
        matches = set(model.__mapper__.columns.keys()) & set(modifications['fields'])
        for m in matches:
            q = sa.select(sa.func.sum(getattr(model, m)))


            result = await session.execute(q)
            curr = result.all()
            for row in curr:
                print(row)
            print()

    # for AsyncEngine created in function scope, close and
    # clean-up pooled connections
    await engine.dispose()


asyncio.run(reading("users", 'fields': ["usage", "allowance"]))

如果您不需要模型,缓存 MetaData 对象而不是在每次调用 retrieve_table_obj 时重新创建它会提高现有代码的效率,并将 select(func.sum(table.c)) 替换为 select(sa.func.sum(getattr(table.c, c)))

【讨论】:

谢谢!我将尝试将其集成到我的代码中,看看它是如何进行的。只是为了澄清您的缓存建议:您的意思是只进行一次调用并将其存储在一个列表中,然后在整个其余代码中从列表中提取而不是从数据库中提取? list 是存储 MetaData 对象的最佳数据类型吗?我更喜欢select(func.sum(table.c)) 的清晰和简洁而不是select(sa.func.sum(getattr(table.c, c))),所以我想尽可能保留select(func.sum(table.c)) 类型的语法。 我只会保留元数据对象本身,否则就是字典;搜索一长串可能会很昂贵。我不明白table.c 是如何工作的,但如果你能让它工作,那很好。 table.c[column_name] 是另一种选择。 对,有道理!再次感谢 关于在多个条件下逐位构建查询,如果我有更新功能:async def updater(table_name, where_condition, data): Table = models_cache.classes[table_name] async with session_maker() as session: query = update(Table).filter_by(**where_condition) query = query.values(**data) await session.execute(query) await session.commit() asyncio.run(updater('appuser', 'name': 'Cheshire', 'org': 'Wonderland')) 我不知道如何将您上面的出色示例与此集成。 例如,如果我们想要一个看起来像:UPDATE appuserSET org = 'Wonderland'WHERE (name = 'Alice' OR name = 'Cheshire') AND (id > 0); 的 SQL 查询,并且我们使用如下参数调用函数:asyncio.run(updater('appuser', 'name': 'Cheshire', 'or': ['name': 'Alice'], 'and': ['id': '> 0'], 'org': 'Wonderland')) 我们如何逐行逐步构建查询?

以上是关于SQLAlchemy Asyncio ORM 在从元数据中检索表和列时无法查询数据库的主要内容,如果未能解决你的问题,请参考以下文章

Asyncio 函数在从脚本调用而不是从 Flask 路由调用时有效

SQLAlchemy(一):SQLAlchemy去连接数据库ORM介绍将ORM模型映射到数据库中

SQLAlchemy使用笔记--SQLAlchemy ORM

SQLAlchemy -- Python的SQLAlchemy和ORM

ORM框架SQLAlchemy

Python ORM框架之SQLALchemy