SQLAlchemy - 将子查询的模式和数据复制到另一个数据库
Posted
技术标签:
【中文标题】SQLAlchemy - 将子查询的模式和数据复制到另一个数据库【英文标题】:SQLAlchemy - copy schema and data of subquery to another database 【发布时间】:2014-02-14 04:23:41 【问题描述】:我正在尝试将来自 postgres (from_engine) 的子查询中的数据复制到 sqlite 数据库。我可以使用以下命令来复制表:
smeta = MetaData(bind=from_engine)
table = Table(table_name, smeta, autoload=True)
table.metadata.create_all(to_engine)
但是,我不确定如何为子查询语句实现相同的效果。
-桑迪普
编辑: 跟进答案。创建表后,我想创建一个子查询 stmt,如下所示:
table = Table("newtable", dest_metadata, *columns)
stmt = dest_session.query(table).subquery();
但是,最后一个 stmt 以错误结束 cursor.execute(语句,参数) sqlalchemy.exc.ProgrammingError:(ProgrammingError)关系“newtable”不存在 第 3 行:来自新表)AS anon_1
【问题讨论】:
您能多解释一下您想要达到的目标吗?您想使用查询结果的方案和数据创建一个表吗?我不明白子查询是如何发挥作用的。 Stefan,是的,我想创建一个表并用查询结果填充它。考虑一个过滤单个属性(即投影和选择)的查询。我想创建一个只有一个属性和选择结果的表,即子查询结果的架构,与执行该查询的原始表相反。希望这会有所帮助。 @Sandeep 这听起来像是拥有视图的确切情况,就像简单地基于查询结果和模式创建一个视图,然后将该视图视为表本身。 SQLAlchemy 不支持创建开箱即用的视图,但添加您自己的支持相对容易,查看gist.github.com/techniq/5174412 以获取视图实现,docs.sqlalchemy.org/en/rel_0_8/core/ddl.html 了解如何将其集成到 SQLAlchemy。如果这是有道理的,我可以将其扩展为一个完整的正确答案 @gts 能够创建视图很好。我想要的不仅仅是创建一个视图。本质上是将架构和结果复制到数据库的另一个实例。 【参考方案1】:一种至少在某些情况下有效的方法:
使用查询对象的column_descriptions
获取有关结果集中列的一些信息。
使用该信息,您可以构建架构以在其他数据库中创建新表。
在源数据库中运行查询并将结果插入到新表中。
首先对示例进行一些设置:
from sqlalchemy import create_engine, MetaData,
from sqlalchemy import Column, Integer, String, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# Engine to the database to query the data from
# (postgresql)
source_engine = create_engine('sqlite:///:memory:', echo=True)
SourceSession = sessionmaker(source_engine)
# Engine to the database to store the results in
# (sqlite)
dest_engine = create_engine('sqlite:///:memory:', echo=True)
DestSession = sessionmaker(dest_engine)
# Create some toy table and fills it with some data
Base = declarative_base()
class Pet(Base):
__tablename__ = 'pets'
id = Column(Integer, primary_key=True)
name = Column(String)
race = Column(String)
Base.metadata.create_all(source_engine)
sourceSession = SourceSession()
sourceSession.add(Pet(name="Fido", race="cat"))
sourceSession.add(Pet(name="Ceasar", race="cat"))
sourceSession.add(Pet(name="Rex", race="dog"))
sourceSession.commit()
现在进入有趣的部分:
# This is the query we want to persist in a new table:
query= sourceSession.query(Pet.name, Pet.race).filter_by(race='cat')
# Build the schema for the new table
# based on the columns that will be returned
# by the query:
metadata = MetaData(bind=dest_engine)
columns = [Column(desc['name'], desc['type']) for desc in query.column_descriptions]
column_names = [desc['name'] for desc in query.column_descriptions]
table = Table("newtable", metadata, *columns)
# Create the new table in the destination database
table.create(dest_engine)
# Finally execute the query
destSession = DestSession()
for row in query:
destSession.execute(table.insert(row))
destSession.commit()
应该有更有效的方法来执行最后一个循环。但是批量插入是另一个话题。
【讨论】:
太棒了!非常感谢。我有一个悬而未决的问题。当我对表对象执行子查询时,它给了我错误关系 newtable 不退出:让我在回复中添加违规行。如果您碰巧知道可能导致这些问题的原因,请告诉我。 在创建表对象之后,但在查询中使用该表之前,必须使用table.create(dest_engine)
在数据库中创建它。这将实际执行“CREATE TABLE”SQL 命令。另请参阅我的示例中的代码。
只想说我想出了查询表实例的答案。可以创建一个动态类并使用经典映射将表与类链接
批量添加没那么复杂:destSession.add_all(list_of_rows)
对批量插入的评论:为了运行 destSession.add_all(list_of_rows) 必须通过运行将每个结果行上的 _sa_instance_state 重置为“瞬态”:[sqlalchemy.orm.session.make_transient(row)对于 list_of_rows 中的行]。【参考方案2】:
您还可以浏览 pandas 数据框。例如,一个方法将使用pandas.read_sql(query, source.connection)
和df.to_sql(table_name, con=destination.connection)
。
【讨论】:
to_sql() 方法没有捕获表之间的关系。以上是关于SQLAlchemy - 将子查询的模式和数据复制到另一个数据库的主要内容,如果未能解决你的问题,请参考以下文章
如何在 sqlalchemy 查询中将日期时间更改为字符串? [复制]