在 SQLAlchemy 中处理插入时重复的主键(声明式样式)
Posted
技术标签:
【中文标题】在 SQLAlchemy 中处理插入时重复的主键(声明式样式)【英文标题】:Dealing with duplicate primary keys on insert in SQLAlchemy (declarative style) 【发布时间】:2012-05-06 13:00:29 【问题描述】:我的应用程序正在使用范围会话和 SQLALchemy 的声明式样式。这是一个网络应用程序,许多数据库插入由任务调度程序Celery
执行。
通常,在决定插入对象时,我的代码可能会执行以下操作:
from schema import Session
from schema.models import Bike
pk = 123 # primary key
bike = Session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
new_bike = Bike(pk, "shiny", "bike")
Session.add(new_bike)
Session.commit()
这里的问题是,因为很多工作是由异步工作人员完成的,所以一个工作可能会在插入 Bike
和 id=123
的过程中完成,而另一个工作可能会检查它的存在。在这种情况下,第二个工作人员将尝试插入具有相同主键的行,SQLAlchemy 将引发IntegrityError
。
除了将Session.commit()
换成:
'''schema/__init__.py'''
from sqlalchemy.orm import scoped_session, sessionmaker
Session = scoped_session(sessionmaker())
def commit(ignore=False):
try:
Session.commit()
except IntegrityError as e:
reason = e.message
logger.warning(reason)
if not ignore:
raise e
if "Duplicate entry" in reason:
logger.info("%s already in table." % e.params[0])
Session.rollback()
然后我到处都有Session.commit
我现在有schema.commit(ignore=True)
我不介意不再插入该行。
对我来说,由于字符串检查,这似乎很脆弱。仅供参考,当提出 IntegrityError
时,它看起来像这样:
(IntegrityError) (1062, "Duplicate entry '123' for key 'PRIMARY'")
所以当然是我插入的主键类似于Duplicate entry is a cool thing
,那么我想我可能会错过IntegrityError
,这实际上并不是因为重复的主键。
有没有更好的方法,可以保持我正在使用的干净的 SQLAlchemy 方法(而不是开始在字符串中写出语句等......)
Db 是 mysql(尽管我喜欢使用 SQLite 进行单元测试,并且不想用任何新方法阻碍这种能力)。
干杯!
【问题讨论】:
为什么不考虑使用自动增量来生成主键?那么您不必担心这个问题。还是有特定的理由不这样做? 有具体原因(对不起,例子有点琐碎)。 【参考方案1】:只需回滚并一一重试,就这么简单:
try:
self._session.bulk_insert_mappings(mapper, items)
self._session.commit()
except IntegrityError:
self._session.rollback()
logger.info("bulk inserting rows failed, fallback to insert one-by-one")
for item in items:
try:
self._session.execute(insert(mapper).values(**item))
self._session.commit()
except SQLAlchemyError as e:
logger.error("Error inserting item: %s for %s", item, e)
【讨论】:
【参考方案2】:使用下面的代码,你应该可以为所欲为,而不仅仅是为了解决这个问题。
class SessionWrapper(Session):
def commit(self, ignore=True):
try:
super(SessionWrapper, self).commit()
except IntegrityError as e:
if not ignore:
raise e
message = e.args[0]
if "Duplicate entry" in message:
logging.info("Error while executing %s.\n%s.", e.statement, message)
finally:
super(SessionWrapper, self).close()
def session(self, auto_commit=False):
session_factory = sessionmaker(class_=SessionWrapper, bind=self.engine, autocommit=auto_commit)
return scoped_session(session_factory)
Session = session()
s1 = Session()
p = Test(test="xxx", id=1)
s1.add(p)
s1.commit()
s1.close()
【讨论】:
【参考方案3】:如果使用session.merge(bike)
而不是session.add(bike)
,则不会产生主键错误。将根据需要检索和更新或创建 bike
。
【讨论】:
如果你使用合并,如果你同时在不同的会话上进行两次合并,你仍然会得到完整性错误。 当会话适合内存时这个答案很好,但对于更大的查询就不那么好了。所以如果你想添加比内存更多的数据,你不能只是创建一堆会话并合并它们,对吧?【参考方案4】:您需要使用下面提到的代码而不是session.add(obj)
,这样会更简洁,并且您不需要像您提到的那样使用自定义提交功能。但是,这将忽略冲突,不仅对于重复键,对于其他键也是如此。
mysql:
self.session.execute(insert(self.table, values=values, prefixes=['IGNORE']))
sqlite
self.session.execute(insert(self.table, values=values, prefixes=['OR IGNORE']))
【讨论】:
【参考方案5】:我假设您在这里的主键在某种程度上是自然的,这就是为什么您不能依赖正常的自动增量技术。因此,假设问题实际上是您需要插入的一些独特列之一,这更常见。
如果你想要“尝试插入,失败时部分回滚”,你可以使用一个 SAVEPOINT,它在 SQLAlchemy 中是 begin_nested()。下一个 rollback() 或 commit() 只作用于那个 SAVEPOINT,而不是更大范围的事情发生。
但是,总的来说,这里的模式只是真正应该避免的模式。你真正想在这里做的是三件事之一。 1. 不要运行处理需要插入的相同键的并发作业。 2. 以某种方式在正在使用的并发键上同步作业,以及 3. 使用一些通用服务来生成这种特定类型的新记录,由作业共享(或确保它们在作业运行之前都已设置好)。
如果你仔细想想,#2 无论如何都会发生在高度孤立的情况下。启动两个 postgres 会话。第 1 节:
test=> create table foo(id integer primary key);
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "foo_pkey" for table "foo"
CREATE TABLE
test=> begin;
BEGIN
test=> insert into foo (id) values (1);
第 2 节:
test=> begin;
BEGIN
test=> insert into foo(id) values(1);
您将看到会话 2 阻塞,因为 PK #1 的行被锁定。我不确定 MySQL 是否足够聪明,可以做到这一点,但这是正确的行为。如果 OTOH 您尝试插入不同的 PK:
^CCancel request sent
ERROR: canceling statement due to user request
test=> rollback;
ROLLBACK
test=> begin;
BEGIN
test=> insert into foo(id) values(2);
INSERT 0 1
test=> \q
它在没有阻塞的情况下进行得很好。
关键是,如果您正在进行这种 PK/UQ 争用,那么您的 celery 任务将自行序列化无论如何,或者至少应该序列化。
【讨论】:
【参考方案6】:您应该以相同的方式处理每个IntegrityError
:回滚事务,并可选择重试。在IntegrityError
之后,某些数据库甚至不允许您做任何事情。您还可以在两个冲突事务开始时获取表上的锁,或者如果数据库允许,则获取更细粒度的锁。
使用with
语句显式开始事务,并自动提交(或在任何异常时回滚):
from schema import Session
from schema.models import Bike
session = Session()
with session.begin():
pk = 123 # primary key
bike = session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
new_bike = Bike(pk, "shiny", "bike")
session.add(new_bike)
【讨论】:
嗨。我不是故意同时安排插入和检查。问题是该对象恰好是由两个单独的进程以临时方式创建的。没有什么不好的,它只是应用程序的方式(实际上对象不是自行车,它们是 times)。但是,您对运行单个工作人员是正确的。我正在研究如何指定单个工作人员管理所有与数据库相关的任务,这将提供我需要的同步性。从应用程序插入不是一种选择。数据库在远程机器上,我需要低于 100 毫秒的网络应用响应。 这类 SQL 问题几乎总是要归咎于设计。例如,您确定不能使数据库的主键自动递增并处理偶尔出现的“两行之前是主键列”的结果吗? [对不起,我应该补充一下,PK 不是自动增量的原因是有充分的理由] 我只是不确定我是否同意。该数据库由许多其他应用程序共享,包括所讨论的表的使用。为什么在您进行了一些尽职调查后,数据库可能会在我的另一个进程/应用程序/人类中插入一行是糟糕的设计?关键是你必须在你的应用程序中处理它。我的问题很简单,我认为在 SQLAlchemy 中处理该问题的唯一方法是通过字符串检查,而且它似乎不是特别健壮。 哦,你只是想处理完整性错误。您可能别无选择,只能回滚任何完整性错误,请查看数据库文档。您是否期待其他类型的完整性错误?添加对象后尝试 session.flush 以更快地得到错误。我确定错误的数字 id 也作为单独的属性存在。 在上面的代码中,您可能希望将.first()
更改为.one()
,因为它应该是一个唯一字段。无论如何,这可能不相关,因为更有趣的观察是答案中的代码引入了竞争条件。在检查记录是否存在并添加它之间,另一个工作人员可能已经添加了它。检查IntegrityError
并在必要时回滚更安全。以上是关于在 SQLAlchemy 中处理插入时重复的主键(声明式样式)的主要内容,如果未能解决你的问题,请参考以下文章
记录一次随意操作数据库,插入新数据,导致与程序添加新数据时,引起的主键值重复问题。More than one row with the given identifier was found: 1690