SQLAlchemy 是不是与 Django 的 get_or_create 等效?

Posted

技术标签:

【中文标题】SQLAlchemy 是不是与 Django 的 get_or_create 等效?【英文标题】:Does SQLAlchemy have an equivalent of Django's get_or_create?SQLAlchemy 是否与 Django 的 get_or_create 等效? 【发布时间】:2012-08-22 14:20:48 【问题描述】:

如果它已经存在(基于提供的参数),我想从数据库中获取一个对象,如果不存在则创建它。

Django 的get_or_create(或source)就是这样做的。 SQLAlchemy 中是否有等效的快捷方式?

我目前正在这样明确地写出来:

def get_or_create_instrument(session, serial_number):
    instrument = session.query(Instrument).filter_by(serial_number=serial_number).first()
    if instrument:
        return instrument
    else:
        instrument = Instrument(serial_number)
        session.add(instrument)
        return instrument

【问题讨论】:

如果对象不存在,只想添加对象,请参阅session.merge:***.com/questions/12297156/… 【参考方案1】:

这基本上就是这样做的方法,AFAIK 没有现成的捷径。

你当然可以概括它:

def get_or_create(session, model, defaults=None, **kwargs):
    instance = session.query(model).filter_by(**kwargs).one_or_none()
    if instance:
        return instance, False
    else:
        params = k: v for k, v in kwargs.items() if not isinstance(v, ClauseElement)
        params.update(defaults or )
        instance = model(**params)
        try:
            session.add(instance)
            session.commit()
        except Exception:  # The actual exception depends on the specific database so we catch all exceptions. This is similar to the official documentation: https://docs.sqlalchemy.org/en/latest/orm/session_transaction.html
            session.rollback()
            instance = session.query(model).filter_by(**kwargs).one()
            return instance, False
        else:
            return instance, True

2020 年更新(仅限 Python 3.9+)

这是一个更简洁的版本,带有 Python 3.9 的 the new dict union operator (|=)

def get_or_create(session, model, defaults=None, **kwargs):
    instance = session.query(model).filter_by(**kwargs).one_or_none()
    if instance:
        return instance, False
    else:
        kwargs |= defaults or 
        instance = model(**kwargs)
        try:
            session.add(instance)
            session.commit()
        except Exception:  # The actual exception depends on the specific database so we catch all exceptions. This is similar to the official documentation: https://docs.sqlalchemy.org/en/latest/orm/session_transaction.html
            session.rollback()
            instance = session.query(model).filter_by(**kwargs).one()
            return instance, False
        else:
            return instance, True

注意:

类似于 Django 版本,这将捕获重复的键约束和类似的错误。如果您的 get 或 create 不能保证返回单个结果,它仍然可能导致竞争条件。

要缓解其中的一些问题,您需要在 session.commit() 之后添加另一个 one_or_none() 样式提取。除非您还使用 with_for_update() 或可序列化事务模式,否则这仍然不能 100% 保证不会出现竞争条件。

【讨论】:

我认为在你阅读“session.Query(model.filter_by(**kwargs).first()”的地方,你应该阅读“session.Query(model.filter_by(**kwargs)) .first()". 是否应该在这周围加个锁,以便另一个线程在该线程有机会创建实例之前不会创建实例? @EoghanM:通常你的会话是线程本地的,所以这无关紧要。 SQLAlchemy 会话并不是线程安全的。 @WolpH 它可能是另一个试图同时创建相同记录的进程。查看 Django 的 get_or_create 实现。它检查完整性错误,并依赖于正确使用唯一约束。 @IvanVirabyan:我以为@EoghanM 正在谈论会话实例。在这种情况下,session.add 块周围应该有一个 try...except IntegrityError: instance = session.Query(...)【参考方案2】:

按照@WoLpH 的解决方案,这是对我有用的代码(简单版):

def get_or_create(session, model, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance
    else:
        instance = model(**kwargs)
        session.add(instance)
        session.commit()
        return instance

有了这个,我可以获取或创建我模型的任何对象。

假设我的模型对象是:

class Country(Base):
    __tablename__ = 'countries'
    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True)

要获取或创建我的对象,我会写:

myCountry = get_or_create(session, Country, name=countryName)

【讨论】:

对于那些像我一样搜索的人,如果行不存在,这是创建行的正确解决方案。 您不需要将新实例添加到会话中吗?否则,如果您在调用代码中发出 session.commit(),则不会发生任何事情,因为新实例未添加到会话中。 谢谢你。我发现这非常有用,因此我创建了一个要点以供将来使用。 gist.github.com/jangeador/e7221fc3b5ebeeac9a08 鉴于您将会话作为参数传递,最好避免使用commit(或者至少只使用flush)。这将会话控制权留给了此方法的调用者,并且不会冒发出过早提交的风险。此外,使用one_or_none() 而不是first() 可能会更安全一些。 我同意@exhuma - 在函数中提交可能会导致意外的冲突和/或性能问题。【参考方案3】:

This SQLALchemy recipe 做得很好,很优雅。

要做的第一件事是定义一个函数,该函数被赋予一个 Session 以使用,并将一个字典与 Session() 相关联,该 Session() 跟踪当前的 unique 键。

def _unique(session, cls, hashfunc, queryfunc, constructor, arg, kw):
    cache = getattr(session, '_unique_cache', None)
    if cache is None:
        session._unique_cache = cache = 

    key = (cls, hashfunc(*arg, **kw))
    if key in cache:
        return cache[key]
    else:
        with session.no_autoflush:
            q = session.query(cls)
            q = queryfunc(q, *arg, **kw)
            obj = q.first()
            if not obj:
                obj = constructor(*arg, **kw)
                session.add(obj)
        cache[key] = obj
        return obj

使用这个函数的一个例子是在一个 mixin 中:

class UniqueMixin(object):
    @classmethod
    def unique_hash(cls, *arg, **kw):
        raise NotImplementedError()

    @classmethod
    def unique_filter(cls, query, *arg, **kw):
        raise NotImplementedError()

    @classmethod
    def as_unique(cls, session, *arg, **kw):
        return _unique(
                    session,
                    cls,
                    cls.unique_hash,
                    cls.unique_filter,
                    cls,
                    arg, kw
            )

最后创建独特的 get_or_create 模型:

from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

engine = create_engine('sqlite://', echo=True)

Session = sessionmaker(bind=engine)

class Widget(UniqueMixin, Base):
    __tablename__ = 'widget'

    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True, nullable=False)

    @classmethod
    def unique_hash(cls, name):
        return name

    @classmethod
    def unique_filter(cls, query, name):
        return query.filter(Widget.name == name)

Base.metadata.create_all(engine)

session = Session()

w1, w2, w3 = Widget.as_unique(session, name='w1'), \
                Widget.as_unique(session, name='w2'), \
                Widget.as_unique(session, name='w3')
w1b = Widget.as_unique(session, name='w1')

assert w1 is w1b
assert w2 is not w3
assert w2 is not w1

session.commit()

该配方更深入地研究了这个想法并提供了不同的方法,但我已经成功地使用了这个。

【讨论】:

如果只有一个 SQLAlchemy Session 对象可以修改数据库,我喜欢这个秘诀。我可能是错的,但是如果其他会话(SQLAlchemy 与否)同时修改数据库,我看不出这如何防止在事务进行时可能由其他会话创建的对象。在这些情况下,我认为依赖 session.add() 之后的刷新和***.com/a/21146492/3690333 之类的异常处理的解决方案更可靠。【参考方案4】:

我一直在解决这个问题,最终得到了一个相当强大的解决方案:

def get_one_or_create(session,
                      model,
                      create_method='',
                      create_method_kwargs=None,
                      **kwargs):
    try:
        return session.query(model).filter_by(**kwargs).one(), False
    except NoResultFound:
        kwargs.update(create_method_kwargs or )
        created = getattr(model, create_method, model)(**kwargs)
        try:
            session.add(created)
            session.flush()
            return created, True
        except IntegrityError:
            session.rollback()
            return session.query(model).filter_by(**kwargs).one(), False

我刚刚写了一个fairly expansive blog post 的所有细节,但我对我使用它的原因提出了一些想法。

    它解压成一个元组,告诉你对象是否存在。这在您的工作流程中通常很有用。

    该函数使您能够使用 @classmethod 修饰的创建者函数(以及特定于它们的属性)。

    当您有多个进程连接到数据存储时,该解决方案可以防止出现竞争条件。

编辑:我已将session.commit() 更改为session.flush(),如this blog post 中所述。请注意,这些决策特定于所使用的数据存储(本例中为 Postgres)。

编辑 2:我在函数中使用 作为默认值进行了更新,因为这是典型的 Python 陷阱。感谢the comment,奈杰尔!如果您对此问题感到好奇,请查看this *** question 和this blog post。

【讨论】:

与 spencer says 相比,这个解决方案是一个很好的解决方案,因为它可以防止竞争条件(通过提交/刷新会话,当心)并完美地模仿 Django 所做的事情。 @kiddouk 不,它没有“完美地”模仿。 Django 的get_or_create 线程安全的。它不是原子的。此外,如果创建了实例,则 Django 的 get_or_create 返回 True 标志,否则返回 False 标志。 @Kate 如果您查看 Django 的 get_or_create,它的作用几乎完全相同。此解决方案还返回 True/False 标志以指示对象是否已创建或获取,并且也不是原子的。但是,线程安全和原子更新是数据库的一个问题,而不是 Django、Flask 或 SQLAlchemy,并且在这个解决方案和 Django 中,都是通过数据库上的事务来解决的。 假设为新记录提供了一个非空字段空值,它将引发 IntegrityError。整个事情搞砸了,现在我们不知道实际发生了什么,我们得到另一个错误,即没有找到记录。 不应该 IntegrityError 的情况下返回 False 因为这个客户端没有创建对象?【参考方案5】:

语义上最接近的可能是:

def get_or_create(model, **kwargs):
    """SqlAlchemy implementation of Django's get_or_create.
    """
    session = Session()
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance, False
    else:
        instance = model(**kwargs)
        session.add(instance)
        session.commit()
        return instance, True

不知道在 sqlalchemy 中依赖全局定义的 Session 是多么的洁净,但是 Django 版本不接受连接所以......

返回的元组包含实例和一个布尔值,指示实例是否已创建(即,如果我们从数据库中读取实例,则为 False)。

Django 的get_or_create 经常用于确保全局数据可用,因此我会尽早提交。

【讨论】:

只要 Session 由 scoped_session 创建和跟踪,这应该可以工作,这应该实现线程安全的会话管理(这在 2014 年存在吗?)。【参考方案6】:

根据您采用的隔离级别,上述解决方案均无效。 我发现的最佳解决方案是以下形式的 RAW SQL:

INSERT INTO table(f1, f2, unique_f3) 
SELECT 'v1', 'v2', 'v3' 
WHERE NOT EXISTS (SELECT 1 FROM table WHERE f3 = 'v3')

无论隔离级别和并行度如何,这都是事务安全的。

注意:为了提高效率,最好为唯一列设置一个索引。

【讨论】:

【参考方案7】:

erik优秀answer的修改版

def get_one_or_create(session,
                      model,
                      create_method='',
                      create_method_kwargs=None,
                      **kwargs):
    try:
        return session.query(model).filter_by(**kwargs).one(), True
    except NoResultFound:
        kwargs.update(create_method_kwargs or )
        try:
            with session.begin_nested():
                created = getattr(model, create_method, model)(**kwargs)
                session.add(created)
            return created, False
        except IntegrityError:
            return session.query(model).filter_by(**kwargs).one(), True
使用nested transaction 仅回滚添加的新项目,而不是回滚所有内容(请参阅此answer 以使用 SQLite 的嵌套事务) 移动create_method。如果创建的对象具有关系并且通过这些关系分配了成员,则它会自动添加到会话中。例如。创建一个book,它有user_iduser作为对应关系,然后在create_method里面做book.user=<user object>会添加book到会话中。这意味着create_method 必须在with 内才能从最终回滚中受益。请注意,begin_nested 会自动触发刷新。

请注意,如果使用 mysql,事务隔离级别必须设置为 READ COMMITTED 而不是 REPEATABLE READ 才能正常工作。 Django 的 get_or_create(和 here)使用相同的策略,另请参见 Django documentation。

【讨论】:

我喜欢这样可以避免回滚不相关的更改,但是如果会话之前查询过模型,IntegrityError 重新查询可能仍会失败,NoResultFound 使用 MySQL 默认隔离级别 REPEATABLE READ在同一笔交易中。我能想出的最佳解决方案是在此查询之前调用session.commit(),这也不理想,因为用户可能不会期望它。引用的答案不存在这个问题,因为 session.rollback() 具有启动新事务的相同效果。 嗯,直到。将查询放入嵌套事务中会起作用吗?你是对的,这个函数内部的commit 可以说比rollback 更糟糕,即使对于特定的用例它是可以接受的。 是的,将初始查询放在嵌套事务中至少可以使第二个查询工作。如果用户之前在同一个事务中明确查询过模型,它仍然会失败。我已经决定这是可以接受的,并且应该警告用户不要这样做或以其他方式捕获异常并决定是否commit() 自己。如果我对代码的理解是正确的,这就是 Django 所做的。 在 django documentation 中,他们说使用`READ COMMITTED, so it does not look like they try to handle this. Looking at the [source](https://github.com/django/django/blob/master/django/db/models/query.py#L491) confirms this. I'm not sure I understand your reply, you mean the user should put his/her query in a nested transaction? It's not clear to me how a SAVEPOINT` 会影响REPEATABLE READ 的读取。如果没有效果那么情况似乎无法挽救,如果效果那么最后一个查询可以嵌套? READ COMMITED 很有趣,也许我应该重新考虑我不触及数据库默认值的决定。我已经测试过,从查询之前恢复SAVEPOINT 使得该查询好像从未在REPEATABLE READ 中发生过。因此,我发现有必要将查询包含在嵌套事务中的 try 子句中,以便 IntegrityError except 子句中的查询完全可以工作。【参考方案8】:

我稍微简化了@Kevin。避免将整个函数包装在 if/else 语句中的解决方案。这样就只有一个return,我觉得更干净:

def get_or_create(session, model, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()

    if not instance:
        instance = model(**kwargs)
        session.add(instance)

    return instance

【讨论】:

【参考方案9】:

有一个 Python 包,其中包含 @erik 的解决方案以及 update_or_create() 的版本。 https://github.com/enricobarzetti/sqlalchemy_get_or_create

【讨论】:

【参考方案10】:

我经常遇到的一个问题是,当一个字段有一个最大长度(例如,STRING(40))并且您想执行一个带有大长度字符串的 get or create 时,上述解决方案将失败。

在上述解决方案的基础上,这是我的方法:

from sqlalchemy import Column, String

def get_or_create(self, add=True, flush=True, commit=False, **kwargs):
    """

    Get the an entity based on the kwargs or create an entity with those kwargs.

    Params:
        add: (default True) should the instance be added to the session?
        flush: (default True) flush the instance to the session?
        commit: (default False) commit the session?
        kwargs: key, value pairs of parameters to lookup/create.

    Ex: SocialPlatform.get_or_create(**'name':'facebook')
        returns --> existing record or, will create a new record

    ---------

    NOTE: I like to add this as a classmethod in the base class of my tables, so that
    all data models inherit the base class --> functionality is transmitted across
    all orm defined models.

    """


    # Truncate values if necessary
    for key, value in kwargs.items():

        # Only use strings
        if not isinstance(value, str):
            continue

        # Only use if it's a column
        my_col = getattr(self.__table__.columns, key)

        if not isinstance(my_col, Column):
            continue

        # Skip non strings again here
        if not isinstance(my_col.type, String):
            continue

        # Get the max length
        max_len = my_col.type.length

        if value and max_len and len(value) > max_len:

            # Update the value
            value = value[:max_len]
            kwargs[key] = value

    # -------------------------------------------------

    # Make the query...
    instance = session.query(self).filter_by(**kwargs).first()

    if instance:
        return instance

    else:
        # Max length isn't accounted for here.
        # The assumption is that auto-truncation will happen on the child-model
        # Or directtly in the db
        instance = self(**kwargs)

    # You'll usually want to add to the session
    if add:
        session.add(instance)

    # Navigate these with caution
    if add and commit:
        try:
            session.commit()
        except IntegrityError:
            session.rollback()

    elif add and flush:
        session.flush()


    return instance

【讨论】:

以上是关于SQLAlchemy 是不是与 Django 的 get_or_create 等效?的主要内容,如果未能解决你的问题,请参考以下文章

Python-Web框架之 - 利用SQLALchemy创建与数据库MySQL的连接, 详解用Flask时会遇到的一些大坑 !

Python ORM框架之SQLALchemy

pyhton ORM框架SQLAlchemy基础与建表

Django ORM和SQLAlchemy类比

Flask SQLAlchemy 数据映射器与活动记录模式

在 sqlalchemy 中提交时是不是有添加与删除的顺序