在 SQLAlchemy 中处理插入时重复的主键(声明式样式)

Posted

技术标签:

【中文标题】在 SQLAlchemy 中处理插入时重复的主键(声明式样式)【英文标题】:Dealing with duplicate primary keys on insert in SQLAlchemy (declarative style) 【发布时间】:2012-05-06 13:00:29 【问题描述】:

我的应用程序正在使用范围会话和 SQLALchemy 的声明式样式。这是一个网络应用程序,许多数据库插入由任务调度程序Celery 执行。

通常,在决定插入对象时,我的代码可能会执行以下操作:

from schema import Session
from schema.models import Bike

pk = 123 # primary key
bike = Session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
    new_bike = Bike(pk, "shiny", "bike")
    Session.add(new_bike)
    Session.commit()

这里的问题是,因为很多工作是由异步工作人员完成的,所以一个工作可能会在插入 Bikeid=123 的过程中完成,而另一个工作可能会检查它的存在。在这种情况下,第二个工作人员将尝试插入具有相同主键的行,SQLAlchemy 将引发IntegrityError

除了将Session.commit() 换成:

'''schema/__init__.py'''
from sqlalchemy.orm import scoped_session, sessionmaker
Session = scoped_session(sessionmaker())

def commit(ignore=False):
    try:
        Session.commit()
    except IntegrityError as e:
        reason = e.message
        logger.warning(reason)

        if not ignore:
            raise e

        if "Duplicate entry" in reason:
            logger.info("%s already in table." % e.params[0])
            Session.rollback()

然后我到处都有Session.commit 我现在有schema.commit(ignore=True) 我不介意不再插入该行。

对我来说,由于字符串检查,这似乎很脆弱。仅供参考,当提出 IntegrityError 时,它看起来像这样:

(IntegrityError) (1062, "Duplicate entry '123' for key 'PRIMARY'")

所以当然是我插入的主键类似于Duplicate entry is a cool thing,那么我想我可能会错过IntegrityError,这实际上并不是因为重复的主键。

有没有更好的方法,可以保持我正在使用的干净的 SQLAlchemy 方法(而不是开始在字符串中写出语句等......)

Db 是 mysql(尽管我喜欢使用 SQLite 进行单元测试,并且不想用任何新方法阻碍这种能力)。

干杯!

【问题讨论】:

为什么不考虑使用自动增量来生成主键?那么您不必担心这个问题。还是有特定的理由不这样做? 有具体原因(对不起,例子有点琐碎)。 【参考方案1】:

只需回滚并一一重试,就这么简单:

try:
    self._session.bulk_insert_mappings(mapper, items)
    self._session.commit()
except IntegrityError:
    self._session.rollback()
    logger.info("bulk inserting rows failed, fallback to insert one-by-one")
    for item in items:
        try:
            self._session.execute(insert(mapper).values(**item))
            self._session.commit()
        except SQLAlchemyError as e:
            logger.error("Error inserting item: %s for %s", item, e)

【讨论】:

【参考方案2】:

使用下面的代码,你应该可以为所欲为,而不仅仅是为了解决这个问题。

class SessionWrapper(Session):
    def commit(self, ignore=True):
        try:
            super(SessionWrapper, self).commit()
        except IntegrityError as e:
            if not ignore:
                raise e
            message = e.args[0]
            if "Duplicate entry" in message:
                logging.info("Error while executing %s.\n%s.", e.statement, message)
        finally:
            super(SessionWrapper, self).close()


def session(self, auto_commit=False):
    session_factory = sessionmaker(class_=SessionWrapper, bind=self.engine, autocommit=auto_commit)
    return scoped_session(session_factory)

Session = session()
s1 = Session()

p = Test(test="xxx", id=1)
s1.add(p)
s1.commit()
s1.close()

【讨论】:

【参考方案3】:

如果使用session.merge(bike) 而不是session.add(bike),则不会产生主键错误。将根据需要检索和更新或创建 bike

【讨论】:

如果你使用合并,如果你同时在不同的会话上进行两次合并,你仍然会得到完整性错误。 当会话适合内存时这个答案很好,但对于更大的查询就不那么好了。所以如果你想添加比内存更多的数据,你不能只是创建一堆会话并合并它们,对吧?【参考方案4】:

您需要使用下面提到的代码而不是session.add(obj),这样会更简洁,并且您不需要像您提到的那样使用自定义提交功能。但是,这将忽略冲突,不仅对于重复键,对于其他键也是如此。

mysql:

 self.session.execute(insert(self.table, values=values, prefixes=['IGNORE']))

sqlite

self.session.execute(insert(self.table, values=values, prefixes=['OR IGNORE']))

【讨论】:

【参考方案5】:

我假设您在这里的主键在某种程度上是自然的,这就是为什么您不能依赖正常的自动增量技术。因此,假设问题实际上是您需要插入的一些独特列之一,这更常见。

如果你想要“尝试插入,失败时部分回滚”,你可以使用一个 SAVEPOINT,它在 SQLAlchemy 中是 begin_nested()。下一个 rollback() 或 commit() 只作用于那个 SAVEPOINT,而不是更大范围的事情发生。

但是,总的来说,这里的模式只是真正应该避免的模式。你真正想在这里做的是三件事之一。 1. 不要运行处理需要插入的相同键的并发作业。 2. 以某种方式在正在使用的并发键上同步作业,以及 3. 使用一些通用服务来生成这种特定类型的新记录,由作业共享(或确保它们在作业运行之前都已设置好)。

如果你仔细想想,#2 无论如何都会发生在高度孤立的情况下。启动两个 postgres 会话。第 1 节:

test=> create table foo(id integer primary key);
NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "foo_pkey" for table "foo"
CREATE TABLE
test=> begin;
BEGIN
test=> insert into foo (id) values (1);

第 2 节:

test=> begin;
BEGIN
test=> insert into foo(id) values(1);

您将看到会话 2 阻塞,因为 PK #1 的行被锁定。我不确定 MySQL 是否足够聪明,可以做到这一点,但这是正确的行为。如果 OTOH 您尝试插入不同的 PK:

^CCancel request sent
ERROR:  canceling statement due to user request
test=> rollback;
ROLLBACK
test=> begin;
BEGIN
test=> insert into foo(id) values(2);
INSERT 0 1
test=> \q

它在没有阻塞的情况下进行得很好。

关键是,如果您正在进行这种 PK/UQ 争用,那么您的 celery 任务将自行序列化无论如何,或者至少应该序列化。

【讨论】:

【参考方案6】:

您应该以相同的方式处理每个IntegrityError:回滚事务,并可选择重试。在IntegrityError 之后,某些数据库甚至不允许您做任何事情。您还可以在两个冲突事务开始时获取表上的锁,或者如果数据库允许,则获取更细粒度的锁。

使用with 语句显式开始事务,并自动提交(或在任何异常时回滚):

from schema import Session
from schema.models import Bike

session = Session()
with session.begin():
    pk = 123 # primary key
    bike = session.query(Bike).filter_by(bike_id=pk).first()
    if not bike: # no bike in DB
        new_bike = Bike(pk, "shiny", "bike")
        session.add(new_bike)

【讨论】:

嗨。我不是故意同时安排插入和检查。问题是该对象恰好是由两个单独的进程以临时方式创建的。没有什么不好的,它只是应用程序的方式(实际上对象不是自行车,它们是 times)。但是,您对运行单个工作人员是正确的。我正在研究如何指定单个工作人员管理所有与数据库相关的任务,这将提供我需要的同步性。从应用程序插入不是一种选择。数据库在远程机器上,我需要低于 100 毫秒的网络应用响应。 这类 SQL 问题几乎总是要归咎于设计。例如,您确定不能使数据库的主键自动递增并处理偶尔出现的“两行之前是主键列”的结果吗? [对不起,我应该补充一下,PK 不是自动增量的原因是有充分的理由] 我只是不确定我是否同意。该数据库由许多其他应用程序共享,包括所讨论的表的使用。为什么在您进行了一些尽职调查后,数据库可能会在我的另一个进程/应用程序/人类中插入一行是糟糕的设计?关键是你必须在你的应用程序中处理它。我的问题很简单,我认为在 SQLAlchemy 中处理该问题的唯一方法是通过字符串检查,而且它似乎不是特别健壮。 哦,你只是想处理完整性错误。您可能别无选择,只能回滚任何完整性错误,请查看数据库文档。您是否期待其他类型的完整性错误?添加对象后尝试 session.flush 以更快地得到错误。我确定错误的数字 id 也作为单独的属性存在。 在上面的代码中,您可能希望将.first() 更改为.one(),因为它应该是一个唯一字段。无论如何,这可能不相关,因为更有趣的观察是答案中的代码引入了竞争条件。在检查记录是否存在并添加它之间,另一个工作人员可能已经添加了它。检查IntegrityError 并在必要时回滚更安全。

以上是关于在 SQLAlchemy 中处理插入时重复的主键(声明式样式)的主要内容,如果未能解决你的问题,请参考以下文章

获取最后插入行的主键 ID 以运行多个插入操作 [重复]

将自动递增的主键插入 Access 表

如何在插入新行后立即将自动生成的主键值保存到第二列

向存储过程添加 IF ...ELSE 语句以跳过重复的主键

PHP获取新插入的主键id

记录一次随意操作数据库,插入新数据,导致与程序添加新数据时,引起的主键值重复问题。More than one row with the given identifier was found: 1690