SQLAlchemy bulk_insert_mappings():无法获取表“测试”的映射器

Posted

技术标签:

【中文标题】SQLAlchemy bulk_insert_mappings():无法获取表“测试”的映射器【英文标题】:SQLAlchemy bulk_insert_mappings(): Could not get mapper for table 'test' 【发布时间】:2019-11-29 22:36:14 【问题描述】:

我一直在尝试使用 sqlalchemy 的 bulk_insert_mappings。我知道我可以创建一个会话并连接到数据库。我已经初始化了我的引擎,但我似乎无法从表中获取我需要的映射器。

from sqlalchemy import create_engine
from sqlalchemy.orm.session import sessionmaker,Session
from sqlalchemy_utils import get_mapper

engine = create_engine('mysql+pymysql://:@IP:PORT/'.format(USER,PW)) # removed my config here
connection = engine.connect()
m = MetaData(bind=engine,schema='test')
m.reflect()

Session = sessionmaker(bind=engine)
s = Session()
s.bulk_insert_mappings(get_mapper(m.tables['test.test']), pd.DataFrame('a':['a','b','c']).to_dict(orient="records"))
s.commit()
s.close()

我最近发现了一堆关于 SO 的相关问题

SQLAlchemy get Mapper object from Table object (from Metadata or Session or otherwise)

但是 sqlalchemy_utils.get_mapper 提出:

“ValueError: 无法获取表 'test' 的映射器。”

sqlalchemy.orm.mapperlib._mapper_registry 似乎是空的。也许是因为我没有将它绑定到我的引擎。但不知道该怎么做。

PS:test是一个非常简单的TEXT类型的单列表

这是 m.tables['test.test'] 的输出

Table('test', MetaData(bind=Engine(mysql+pymysql://USER:***@IP:PORT/)), Column('a', TEXT(), table=<test>), schema='test')

【问题讨论】:

【参考方案1】:

SQLAlchemy Mapper 的工作是:

定义类属性与数据库表列的相关性。

... 它是 SQLAlchemy ORM 的基础。使用 ORM,Python 类表示数据库中的表,并且需要某种机制将类的属性与表中的列相关联。如果您不使用 ORM,则您的表不会映射到 Python 类,因此没有使用映射器。这就是您从get_mapper() 收到错误的原因。

在你的例子中:

m = MetaData(bind=engine,schema='test')
m.reflect()

MetaData 是:

Table 对象及其关联架构构造的集合。

MetaData.reflect:

自动在此MetaData 中为数据库中可用但尚未出现在MetaData 中的任何表创建Table 条目。

所以此时,您有一个Table 对象的集合,并且您希望对其中一个执行批量插入。不要将Table 对象与 ORM 映射类混淆,它们不是一回事。

bulk_insert_mappings 状态的文档:

对给定的映射字典列表执行批量插入。

给定字典中的值通常不加修改地传递到核心 Insert() 构造中

您正在尝试实现数据的批量插入,我们可以跳过 ORM 方法(任何涉及 Session 的方法)并显式与核心交互。

表达式pd.DataFrame('a':['a','b','c']).to_dict(orient="records") 返回dicts 的列表,例如:['a': 'a', 'a': 'b', 'a': 'c'],因此为了简单起见,我将使用此处的示例输出。

您的元数据对象中有已使用m.tables['test.test'] 检索的表,并且该Table 对象可用于生成其自己的插入语句:

print(m.tables['test.test'].insert())
# INSERT INTO test.test (a) VALUES (%(a)s)

要执行多个语句,我们可以将字典列表传递给Connection.execute(),如下所示。

ORM Session 的一个好处是它允许显式事务管理,您可以在必要时调用Session.rollback()Session.commit()。连接对象也可以使用Engine.begin() 在类似于Session 的事务中显式操作。

例如,使用上下文管理器:

with engine.begin() as conn:
    conn.execute(
        m.tables['test.test'].insert(),
        *['a': 'a', 'a': 'b', 'a': 'c']
    )

如果上下文中没有错误,这将自动提交查询,如果有错误则回滚。

引擎日志显示此表达式发出以下查询:

INSERT INTO test.test (a) VALUES (%(a)s)
('a': 'a', 'a': 'b', 'a': 'c')

以下人为设计的示例显示了您通过使用 Session.bulk_insert_mappings() 进行的原始查询。我不得不创建一个 ORM 模型来表示表并向表中添加一个 id 字段,因为 ORM 不喜欢在没有主键的情况下工作。

m = MetaData(bind=engine,schema='test')
Base = declarative_base(metadata=m)

class Test(Base):
    __tablename__ = 'test'
    id = Column(Integer, primary_key=True)
    a = Column(Text)


Session = sessionmaker(bind=engine)
s = Session()
s.bulk_insert_mappings(get_mapper(m.tables['test.test']), pd.DataFrame('a':['a','b','c']).to_dict(orient="records"))
s.commit()
s.close()

这是引擎日志中执行的查询:

INSERT INTO test.test (a) VALUES (%(a)s)
('a': 'a', 'a': 'b', 'a': 'c')

您会注意到,这与我们通过直接使用 Core 实现的查询完全相同。

【讨论】:

非常感谢!正是我需要的以及围绕它的正确解释=)【参考方案2】:

我一直在谷歌上搜索完全相同的问题。不过,我找到了解决此问题的方法。

class Helper():
   pass
new_mapper = sqlalchemy.orm.mapper(Helper, local_table = m.tables['test.test'])
session.bulk_insert_mappings(new_mapper, 
df.to_dict(orient="records"), return_defaults = False)
session.commit()
session.close()

根据the following link,我认为 df.to_sql 在将大量数据帧插入到 sql 表中时表现非常差。然而,结果证明 bulk_insert_mappings 慢得多。 希望对你有帮助。

【讨论】:

以上是关于SQLAlchemy bulk_insert_mappings():无法获取表“测试”的映射器的主要内容,如果未能解决你的问题,请参考以下文章

为啥 sqlalchemy 没有正确设置默认值?

SQLAlchemy - 过滤子查询负载

sqlalchemy pymssql“对等连接重置”恢复

如何在 sqlalchemy 查询中将日期时间更改为字符串? [复制]

SQLAlchemy ORM:修改查询返回的列

使用 Python SQLAlchemy 将 JSON 发布到数据库