SQLAlchemy Asyncio ORM 在从元数据中检索表和列时无法查询数据库
Posted
技术标签:
【中文标题】SQLAlchemy Asyncio ORM 在从元数据中检索表和列时无法查询数据库【英文标题】:SQLAlchemy Asyncio ORM Unable to Query Database When Retrieving Tables and Columns from MetaData 【发布时间】:2022-01-07 08:07:53 【问题描述】:使用 SQLAlchemy 异步 ORM 1.4、Postgres 后端、Python 3.7
我在 SA ORM 中使用augmented Declarative Base
。这些表不在models.py
中,而是通过解析包含所有表模式的 JSON 脚本直接提交到数据库。因此,我无法导入脚本顶部的模型,例如 from models import ThisTable
。
所以要对表进行 CRUD 操作,我首先通过反映元数据来检索它们。
以“通常”的方式,在脚本顶部导入所有表时,这样的查询会起作用:
result = await s.execute(select(func.sum(TableName.column)))
curr = result.all()
当我尝试从元数据中反映表和列对象以查询它们时,这不起作用。有很多AttributeError: 'Table' object has no attribute 'func'
或TypeError: 'Table' object is not callable
错误。
def retrieve_table_obj(table):
meta = MetaData()
meta.reflect(bind=sync_engine)
return meta.tables[table]
def retrieve_table_cols(self, table):
table = retrieve_table_obj('users')
return table.columns.keys()
async def reading(collection, modifications):
table = db.retrieve_table_obj(collection)
columns = db.retrieve_table_cols(collection)
for c in columns:
for f in mods['fields']:
if c in f:
q = select(func.sum(table.c))
result = await s.execute(q)
curr = result.all()
asyncio.run(reading("users", 'fields': ["usage", "allowance"]))
第一次必须显式检索数据库中的表和列时,如何查询它们?
【问题讨论】:
您可以使用 automap 扩展通过反射构建模型。 感谢自动地图的建议!它适用于同步引擎,但我正在使用异步引擎并且无法让 automap 使用它,即使在获取引擎连接并使用 conn.run_sync 调用函数时也是如此。您是否成功地将自动映射与异步引擎实例一起使用? 【参考方案1】:自动映射扩展可用于自动将数据库表反映到 SQLAlchemy 模型。但是 automap 在引擎上使用inspect
,这在异步引擎上不受支持;我们可以通过使用同步引擎进行自动映射来解决这个问题。一旦模型被映射,异步引擎就可以使用它们。
例如:
import asyncio
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.automap import automap_base
sync_engine = sa.create_engine('postgresql:///test', echo=True, future=True)
Base = automap_base()
Base.prepare(sync_engine, reflect=True)
async def async_main(collection, modifications):
engine = create_async_engine(
"postgresql+asyncpg:///test",
echo=True,
future=True,
connect_args='ssl': False,
)
async_session = orm.sessionmaker(
engine, class_=AsyncSession, future=True
)
async with async_session() as session:
model = Base.classes[collection]
matches = set(model.__mapper__.columns.keys()) & set(modifications['fields'])
for m in matches:
q = sa.select(sa.func.sum(getattr(model, m)))
result = await session.execute(q)
curr = result.all()
for row in curr:
print(row)
print()
# for AsyncEngine created in function scope, close and
# clean-up pooled connections
await engine.dispose()
asyncio.run(reading("users", 'fields': ["usage", "allowance"]))
如果您不需要模型,缓存 MetaData
对象而不是在每次调用 retrieve_table_obj
时重新创建它会提高现有代码的效率,并将 select(func.sum(table.c))
替换为 select(sa.func.sum(getattr(table.c, c)))
【讨论】:
谢谢!我将尝试将其集成到我的代码中,看看它是如何进行的。只是为了澄清您的缓存建议:您的意思是只进行一次调用并将其存储在一个列表中,然后在整个其余代码中从列表中提取而不是从数据库中提取?list
是存储 MetaData
对象的最佳数据类型吗?我更喜欢select(func.sum(table.c))
的清晰和简洁而不是select(sa.func.sum(getattr(table.c, c)))
,所以我想尽可能保留select(func.sum(table.c))
类型的语法。
我只会保留元数据对象本身,否则就是字典;搜索一长串可能会很昂贵。我不明白table.c
是如何工作的,但如果你能让它工作,那很好。 table.c[column_name]
是另一种选择。
对,有道理!再次感谢
关于在多个条件下逐位构建查询,如果我有更新功能:async def updater(table_name, where_condition, data): Table = models_cache.classes[table_name] async with session_maker() as session: query = update(Table).filter_by(**where_condition) query = query.values(**data) await session.execute(query) await session.commit() asyncio.run(updater('appuser', 'name': 'Cheshire', 'org': 'Wonderland'))
我不知道如何将您上面的出色示例与此集成。
例如,如果我们想要一个看起来像:UPDATE appuserSET org = 'Wonderland'WHERE (name = 'Alice' OR name = 'Cheshire') AND (id > 0);
的 SQL 查询,并且我们使用如下参数调用函数:asyncio.run(updater('appuser', 'name': 'Cheshire', 'or': ['name': 'Alice'], 'and': ['id': '> 0'], 'org': 'Wonderland'))
我们如何逐行逐步构建查询?
以上是关于SQLAlchemy Asyncio ORM 在从元数据中检索表和列时无法查询数据库的主要内容,如果未能解决你的问题,请参考以下文章
Asyncio 函数在从脚本调用而不是从 Flask 路由调用时有效
SQLAlchemy(一):SQLAlchemy去连接数据库ORM介绍将ORM模型映射到数据库中
SQLAlchemy使用笔记--SQLAlchemy ORM