SQLAlchemy 是不是与 Django 的 get_or_create 等效?
Posted
技术标签:
【中文标题】SQLAlchemy 是不是与 Django 的 get_or_create 等效?【英文标题】:Does SQLAlchemy have an equivalent of Django's get_or_create?SQLAlchemy 是否与 Django 的 get_or_create 等效? 【发布时间】:2011-02-02 12:34:24 【问题描述】:如果它已经存在(基于提供的参数),我想从数据库中获取一个对象,如果不存在则创建它。
Django 的get_or_create
(或source)就是这样做的。 SQLAlchemy 中是否有等效的快捷方式?
我目前正在这样明确地写出来:
def get_or_create_instrument(session, serial_number):
instrument = session.query(Instrument).filter_by(serial_number=serial_number).first()
if instrument:
return instrument
else:
instrument = Instrument(serial_number)
session.add(instrument)
return instrument
【问题讨论】:
如果对象不存在,只想添加对象,请参阅session.merge
:***.com/questions/12297156/…
【参考方案1】:
按照@WoLpH 的解决方案,这是对我有用的代码(简单版):
def get_or_create(session, model, **kwargs):
instance = session.query(model).filter_by(**kwargs).first()
if instance:
return instance
else:
instance = model(**kwargs)
session.add(instance)
session.commit()
return instance
有了这个,我可以获取或创建我模型的任何对象。
假设我的模型对象是:
class Country(Base):
__tablename__ = 'countries'
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
要获取或创建我的对象,我会写:
myCountry = get_or_create(session, Country, name=countryName)
【讨论】:
对于那些像我一样搜索的人,如果行不存在,这是创建行的正确解决方案。 您不需要将新实例添加到会话中吗?否则,如果您在调用代码中发出 session.commit(),则不会发生任何事情,因为新实例未添加到会话中。 谢谢你。我发现这非常有用,因此我创建了一个要点以供将来使用。 gist.github.com/jangeador/e7221fc3b5ebeeac9a08 鉴于您将会话作为参数传递,最好避免使用commit
(或者至少只使用flush
)。这将会话控制权留给了此方法的调用者,并且不会冒发出过早提交的风险。此外,使用one_or_none()
而不是first()
可能会更安全一些。
我同意@exhuma - 在函数中提交可能会导致意外的冲突和/或性能问题。【参考方案2】:
这基本上就是这样做的方法,AFAIK 没有现成的捷径。
你当然可以概括它:
def get_or_create(session, model, defaults=None, **kwargs):
instance = session.query(model).filter_by(**kwargs).one_or_none()
if instance:
return instance, False
else:
params = k: v for k, v in kwargs.items() if not isinstance(v, ClauseElement)
params.update(defaults or )
instance = model(**params)
try:
session.add(instance)
session.commit()
except Exception: # The actual exception depends on the specific database so we catch all exceptions. This is similar to the official documentation: https://docs.sqlalchemy.org/en/latest/orm/session_transaction.html
session.rollback()
instance = session.query(model).filter_by(**kwargs).one()
return instance, False
else:
return instance, True
2020 年更新(仅限 Python 3.9+)
这是一个更简洁的版本,带有 Python 3.9 的 the new dict union operator (|=)
def get_or_create(session, model, defaults=None, **kwargs):
instance = session.query(model).filter_by(**kwargs).one_or_none()
if instance:
return instance, False
else:
kwargs |= defaults or
instance = model(**kwargs)
try:
session.add(instance)
session.commit()
except Exception: # The actual exception depends on the specific database so we catch all exceptions. This is similar to the official documentation: https://docs.sqlalchemy.org/en/latest/orm/session_transaction.html
session.rollback()
instance = session.query(model).filter_by(**kwargs).one()
return instance, False
else:
return instance, True
注意:
类似于 Django 版本,这将捕获重复的键约束和类似的错误。如果您的 get 或 create 不能保证返回单个结果,它仍然可能导致竞争条件。
为了缓解其中的一些问题,您需要在 session.commit()
之后添加另一个 one_or_none()
样式提取。除非您还使用 with_for_update()
或可序列化事务模式,否则这仍然不能 100% 保证不会出现竞争条件。
【讨论】:
我认为在你阅读“session.Query(model.filter_by(**kwargs).first()”的地方,你应该阅读“session.Query(model.filter_by(**kwargs)) .first()". 是否应该在这周围加个锁,以便另一个线程在该线程有机会创建实例之前不会创建实例? @EoghanM:通常你的会话是线程本地的,所以这无关紧要。 SQLAlchemy 会话并不是线程安全的。 @WolpH 它可能是另一个试图同时创建相同记录的进程。查看 Django 的 get_or_create 实现。它检查完整性错误,并依赖于正确使用唯一约束。 @IvanVirabyan:我以为@EoghanM 正在谈论会话实例。在这种情况下,session.add
块周围应该有一个 try...except IntegrityError: instance = session.Query(...)
。【参考方案3】:
我一直在解决这个问题,最终得到了一个相当强大的解决方案:
def get_one_or_create(session,
model,
create_method='',
create_method_kwargs=None,
**kwargs):
try:
return session.query(model).filter_by(**kwargs).one(), False
except NoResultFound:
kwargs.update(create_method_kwargs or )
created = getattr(model, create_method, model)(**kwargs)
try:
session.add(created)
session.flush()
return created, True
except IntegrityError:
session.rollback()
return session.query(model).filter_by(**kwargs).one(), False
我刚刚写了一个fairly expansive blog post 的所有细节,但我对我使用它的原因提出了一些想法。
它解压成一个元组,告诉你对象是否存在。这在您的工作流程中通常很有用。
该函数使您能够使用 @classmethod
修饰的创建者函数(以及特定于它们的属性)。
当您有多个进程连接到数据存储时,该解决方案可以防止出现竞争条件。
编辑:我已将session.commit()
更改为session.flush()
,如this blog post 中所述。请注意,这些决策特定于所使用的数据存储(本例中为 Postgres)。
编辑 2:我在函数中使用 作为默认值进行了更新,因为这是典型的 Python 陷阱。感谢the comment,奈杰尔!如果您对此问题感到好奇,请查看this *** question 和this blog post。
【讨论】:
与 spencer says 相比,这个解决方案是一个很好的解决方案,因为它可以防止竞争条件(通过提交/刷新会话,当心)并完美地模仿 Django 所做的事情。 @kiddouk 不,它没有“完美”地模仿。 Django 的get_or_create
不是 线程安全的。它不是原子的。此外,如果创建了实例,则 Django 的 get_or_create
返回 True 标志,否则返回 False 标志。
@Kate 如果您查看 Django 的 get_or_create
,它的作用几乎完全相同。此解决方案还返回 True/False
标志以指示对象是否已创建或获取,并且也不是原子的。但是,线程安全和原子更新是数据库的一个问题,而不是 Django、Flask 或 SQLAlchemy,并且在这个解决方案和 Django 中,都是通过数据库上的事务来解决的。
假设为新记录提供了一个非空字段空值,它将引发 IntegrityError。整个事情变得一团糟,现在我们不知道实际发生了什么,我们得到另一个错误,即没有找到记录。
不应该 IntegrityError
的情况下返回 False
因为这个客户端没有创建对象?【参考方案4】:
erik优秀answer的修改版
def get_one_or_create(session,
model,
create_method='',
create_method_kwargs=None,
**kwargs):
try:
return session.query(model).filter_by(**kwargs).one(), True
except NoResultFound:
kwargs.update(create_method_kwargs or )
try:
with session.begin_nested():
created = getattr(model, create_method, model)(**kwargs)
session.add(created)
return created, False
except IntegrityError:
return session.query(model).filter_by(**kwargs).one(), True
使用nested transaction 仅回滚添加的新项目,而不是回滚所有内容(请参阅此answer 以使用 SQLite 的嵌套事务)
移动create_method
。如果创建的对象具有关系并且通过这些关系分配了成员,则它会自动添加到会话中。例如。创建一个book
,它有user_id
和user
作为对应关系,然后在create_method
里面做book.user=<user object>
会添加book
到会话中。这意味着create_method
必须在with
内才能从最终回滚中受益。请注意,begin_nested
会自动触发刷新。
请注意,如果使用 mysql,事务隔离级别必须设置为 READ COMMITTED
而不是 REPEATABLE READ
才能正常工作。 Django 的 get_or_create(和 here)使用相同的策略,另请参见 Django documentation。
【讨论】:
我喜欢这样可以避免回滚不相关的更改,但是如果会话之前查询过模型,IntegrityError
重新查询可能仍会失败,NoResultFound
使用 MySQL 默认隔离级别 REPEATABLE READ
在同一笔交易中。我能想出的最佳解决方案是在此查询之前调用session.commit()
,这也不理想,因为用户可能不会期望它。引用的答案不存在这个问题,因为 session.rollback() 具有启动新事务的相同效果。
嗯,直到。将查询放入嵌套事务中会起作用吗?你是对的,这个函数内部的commit
可以说比rollback
更糟糕,即使对于特定的用例它是可以接受的。
是的,将初始查询放在嵌套事务中至少可以使第二个查询工作。如果用户之前在同一个事务中明确查询过模型,它仍然会失败。我已经决定这是可以接受的,并且应该警告用户不要这样做或以其他方式捕获异常并决定是否commit()
自己。如果我对代码的理解是正确的,这就是 Django 所做的。
在 django documentation 中,他们说使用`READ COMMITTED, so it does not look like they try to handle this. Looking at the [source](https://github.com/django/django/blob/master/django/db/models/query.py#L491) confirms this. I'm not sure I understand your reply, you mean the user should put his/her query in a nested transaction? It's not clear to me how a
SAVEPOINT` 会影响REPEATABLE READ
的读取。如果没有效果那么情况似乎无法挽救,如果效果那么最后一个查询可以嵌套?
READ COMMITED
很有趣,也许我应该重新考虑我不触及数据库默认值的决定。我已经测试过,从进行查询之前恢复SAVEPOINT
会使该查询好像从未在REPEATABLE READ
中发生过。因此,我发现有必要将查询包含在嵌套事务中的 try 子句中,以便 IntegrityError
except 子句中的查询完全可以工作。【参考方案5】:
This SQLALchemy recipe 做得很好,很优雅。
要做的第一件事是定义一个函数,该函数被赋予一个 Session 以使用,并将一个字典与 Session() 相关联,该 Session() 跟踪当前的 unique 键。
def _unique(session, cls, hashfunc, queryfunc, constructor, arg, kw):
cache = getattr(session, '_unique_cache', None)
if cache is None:
session._unique_cache = cache =
key = (cls, hashfunc(*arg, **kw))
if key in cache:
return cache[key]
else:
with session.no_autoflush:
q = session.query(cls)
q = queryfunc(q, *arg, **kw)
obj = q.first()
if not obj:
obj = constructor(*arg, **kw)
session.add(obj)
cache[key] = obj
return obj
使用这个函数的一个例子是在一个 mixin 中:
class UniqueMixin(object):
@classmethod
def unique_hash(cls, *arg, **kw):
raise NotImplementedError()
@classmethod
def unique_filter(cls, query, *arg, **kw):
raise NotImplementedError()
@classmethod
def as_unique(cls, session, *arg, **kw):
return _unique(
session,
cls,
cls.unique_hash,
cls.unique_filter,
cls,
arg, kw
)
最后创建独特的 get_or_create 模型:
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
engine = create_engine('sqlite://', echo=True)
Session = sessionmaker(bind=engine)
class Widget(UniqueMixin, Base):
__tablename__ = 'widget'
id = Column(Integer, primary_key=True)
name = Column(String, unique=True, nullable=False)
@classmethod
def unique_hash(cls, name):
return name
@classmethod
def unique_filter(cls, query, name):
return query.filter(Widget.name == name)
Base.metadata.create_all(engine)
session = Session()
w1, w2, w3 = Widget.as_unique(session, name='w1'), \
Widget.as_unique(session, name='w2'), \
Widget.as_unique(session, name='w3')
w1b = Widget.as_unique(session, name='w1')
assert w1 is w1b
assert w2 is not w3
assert w2 is not w1
session.commit()
该配方更深入地研究了这个想法并提供了不同的方法,但我已经成功地使用了这个。
【讨论】:
如果只有一个 SQLAlchemy Session 对象可以修改数据库,我喜欢这个秘诀。我可能是错的,但是如果其他会话(SQLAlchemy 与否)同时修改数据库,我看不出这如何防止在事务进行时其他会话可能已创建的对象。在这些情况下,我认为依赖 session.add() 之后的刷新和***.com/a/21146492/3690333 之类的异常处理的解决方案更可靠。【参考方案6】:语义上最接近的可能是:
def get_or_create(model, **kwargs):
"""SqlAlchemy implementation of Django's get_or_create.
"""
session = Session()
instance = session.query(model).filter_by(**kwargs).first()
if instance:
return instance, False
else:
instance = model(**kwargs)
session.add(instance)
session.commit()
return instance, True
不知道在 sqlalchemy 中依赖全局定义的 Session
是多么的洁净,但 Django 版本不接受连接,所以......
返回的元组包含实例和一个布尔值,指示实例是否已创建(即,如果我们从数据库中读取实例,则为 False)。
Django 的get_or_create
经常用于确保全局数据可用,所以我会尽早提交。
【讨论】:
只要 Session 由scoped_session
创建和跟踪,这应该可以工作,这应该实现线程安全的会话管理(这在 2014 年存在吗?)。【参考方案7】:
我稍微简化了@Kevin。避免将整个函数包装在 if
/else
语句中的解决方案。这样就只有一个return
,我觉得更干净:
def get_or_create(session, model, **kwargs):
instance = session.query(model).filter_by(**kwargs).first()
if not instance:
instance = model(**kwargs)
session.add(instance)
return instance
【讨论】:
【参考方案8】:有一个 Python 包,其中包含 @erik 的解决方案以及 update_or_create()
的版本。 https://github.com/enricobarzetti/sqlalchemy_get_or_create
【讨论】:
【参考方案9】:根据您采用的隔离级别,上述解决方案均无效。 我发现的最佳解决方案是以下形式的 RAW SQL:
INSERT INTO table(f1, f2, unique_f3)
SELECT 'v1', 'v2', 'v3'
WHERE NOT EXISTS (SELECT 1 FROM table WHERE f3 = 'v3')
无论隔离级别和并行度如何,这都是事务安全的。
注意:为了提高效率,明智的做法是为唯一列设置一个索引。
【讨论】:
【参考方案10】:我经常遇到的一个问题是,当一个字段有一个最大长度(例如,STRING(40)
)并且您想执行一个带有大长度字符串的 get or create
时,上述解决方案将失败。
在上述解决方案的基础上,这是我的方法:
from sqlalchemy import Column, String
def get_or_create(self, add=True, flush=True, commit=False, **kwargs):
"""
Get the an entity based on the kwargs or create an entity with those kwargs.
Params:
add: (default True) should the instance be added to the session?
flush: (default True) flush the instance to the session?
commit: (default False) commit the session?
kwargs: key, value pairs of parameters to lookup/create.
Ex: SocialPlatform.get_or_create(**'name':'facebook')
returns --> existing record or, will create a new record
---------
NOTE: I like to add this as a classmethod in the base class of my tables, so that
all data models inherit the base class --> functionality is transmitted across
all orm defined models.
"""
# Truncate values if necessary
for key, value in kwargs.items():
# Only use strings
if not isinstance(value, str):
continue
# Only use if it's a column
my_col = getattr(self.__table__.columns, key)
if not isinstance(my_col, Column):
continue
# Skip non strings again here
if not isinstance(my_col.type, String):
continue
# Get the max length
max_len = my_col.type.length
if value and max_len and len(value) > max_len:
# Update the value
value = value[:max_len]
kwargs[key] = value
# -------------------------------------------------
# Make the query...
instance = session.query(self).filter_by(**kwargs).first()
if instance:
return instance
else:
# Max length isn't accounted for here.
# The assumption is that auto-truncation will happen on the child-model
# Or directtly in the db
instance = self(**kwargs)
# You'll usually want to add to the session
if add:
session.add(instance)
# Navigate these with caution
if add and commit:
try:
session.commit()
except IntegrityError:
session.rollback()
elif add and flush:
session.flush()
return instance
【讨论】:
以上是关于SQLAlchemy 是不是与 Django 的 get_or_create 等效?的主要内容,如果未能解决你的问题,请参考以下文章
Python-Web框架之 - 利用SQLALchemy创建与数据库MySQL的连接, 详解用Flask时会遇到的一些大坑 !