属性持久化的 SQLAlchemy ORM 事件挂钩

Posted

技术标签:

【中文标题】属性持久化的 SQLAlchemy ORM 事件挂钩【英文标题】:SQLAlchemy ORM Event hook for attribute persisted 【发布时间】:2016-04-19 23:09:19 【问题描述】:

我正在努力在 SQLAlchemy 事件中找到一种方法,以便在属性更新并持久化到数据库时调用外部 API。这是我的背景:

具有名为birthday 的属性的User 模型。当User 模型的实例被更新并保存时,我想调用一个外部 API 来相应地更新这个用户的生日。

我试过Attribute Events,但是,它产生了太多的命中,无法保证set/remove 属性事件最终会被持久化(自动提交设置为 False 并且事务获取发生错误时回滚。)

Session Events 也不起作用,因为它需要 Session/SessionFactory 作为参数,而且代码中有很多地方都使用了会话。

我一直在查看官方文档中所有可能的SQLAlchemy ORM event hooks,但我找不到任何一个满足我的要求。

我想知道是否有人对如何在 SQLAlchemy 中实现这种组合事件触发器有任何见解。谢谢。

【问题讨论】:

【参考方案1】:

您可以通过组合多个事件来做到这一点。您需要使用的具体事件取决于您的特定应用程序,但基本思想是这样的:

    [InstanceEvents.load] 加载实例时,记下该实例已加载且稍后未添加到会话的事实(如果实例已加载,我们只想保存初始状态) [AttributeEvents.set/append/remove] 当一个属性改变时,记下它被改变的事实,如果有必要,它是从什么改变的(如果你不需要初始状态,前两个步骤是可选的) [SessionEvents.before_flush] 发生刷新时,记下实际保存了哪些实例 [SessionEvents.before_commit] 在提交完成之前,记下实例的当前状态(因为在提交之后您可能无法再访问它) [SessionEvents.after_commit] 提交完成后,触发自定义事件处理程序并清除您保存的实例

一个有趣的挑战是事件的顺序。如果您执行session.commit() 而不执行session.flush(),您会注意到before_commit 事件在before_flush 事件之前触发,这与您在session.commit() 之前执行session.flush() 的情况不同。解决方案是在您的before_commit 调用中调用session.flush() 以强制排序。这可能不是 100% 洁净的,但它在生产中对我有用。

这是一个(简单的)事件顺序图:

begin
load
(save initial state)
set attribute
...
flush
set attribute
...
flush
...
(save modified state)
commit
(fire off "object saved and changed" event)

完整示例

from itertools import chain
from weakref import WeakKeyDictionary, WeakSet
from sqlalchemy import Column, String, Integer, create_engine
from sqlalchemy import event
from sqlalchemy.orm import sessionmaker, object_session
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

engine = create_engine("sqlite://")
Session = sessionmaker(bind=engine)


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    birthday = Column(String)


@event.listens_for(User.birthday, "set", active_history=True)
def _record_initial_state(target, value, old, initiator):
    session = object_session(target)
    if session is None:
        return
    if target not in session.info.get("loaded_instances", set()):
        return
    initial_state = session.info.setdefault("initial_state", WeakKeyDictionary())
    # this is where you save the entire object's state, not necessarily just the birthday attribute
    initial_state.setdefault(target, old)


@event.listens_for(User, "load")
def _record_loaded_instances_on_load(target, context):
    session = object_session(target)
    loaded_instances = session.info.setdefault("loaded_instances", WeakSet())
    loaded_instances.add(target)


@event.listens_for(Session, "before_flush")
def track_instances_before_flush(session, context, instances):
    modified_instances = session.info.setdefault("modified_instances", WeakSet())
    for obj in chain(session.new, session.dirty):
        if session.is_modified(obj) and isinstance(obj, User):
            modified_instances.add(obj)


@event.listens_for(Session, "before_commit")
def set_pending_changes_before_commit(session):
    session.flush()  # IMPORTANT
    initial_state = session.info.get("initial_state", )
    modified_instances = session.info.get("modified_instances", set())
    del session.info["modified_instances"]
    pending_changes = session.info["pending_changes"] = []
    for obj in modified_instances:
        initial = initial_state.get(obj)
        current = obj.birthday
        pending_changes.append(
            "initial": initial,
            "current": current,
        )
        initial_state[obj] = current


@event.listens_for(Session, "after_commit")
def after_commit(session):
    pending_changes = session.info.get("pending_changes", )
    del session.info["pending_changes"]
    for changes in pending_changes:
        print(changes)  # this is where you would fire your custom event

    loaded_instances = session.info["loaded_instances"] = WeakSet()
    for v in session.identity_map.values():
        if isinstance(v, User):
            loaded_instances.add(v)


def main():
    engine = create_engine("sqlite://", echo=False)
    Base.metadata.create_all(bind=engine)
    session = Session(bind=engine)

    user = User(birthday="foo")
    session.add(user)
    user.birthday = "bar"
    session.flush()
    user.birthday = "baz"
    session.commit()  # prints: "initial": None, "current": "baz"
    user.birthday = "foobar"
    session.commit()  # prints: "initial": "baz", "current": "foobar"

    session.close()


if __name__ == "__main__":
    main()

如您所见,它有点复杂,而且不太符合人体工程学。如果将它集成到 ORM 中会更好,但我也理解不这样做可能是有原因的。

【讨论】:

感谢您的全面回答!我仍在试图理解为什么我们需要在 5 个步骤中链接 3 种类型的 SA 事件挂钩。想以更简单的方式详细说明吗?另外,对于您示例中的步骤 3-5, Session 指的是同一个全局会话工厂,监听工厂而不是特定会话实例是否重要?我对 Session 事件应该挂钩的事件目标的正确性感到困惑。 @Devy 简而言之,它们每个都为您提供了您需要的一条信息,以便了解实例上的状态更改已提交,而不是回滚,或者实例是瞬态的首先,或者通过一系列其他方式,一个实例可以触发您不关心的这些事件之一。它是 Session 而不是单个会话的原因是因为您希望它为 any 会话触发,对,而不是单个会话。 (否则,您需要为每个单独的会话设置监听器。) 是的,我想为 any 会话而不是特定会话触发。但我正在使用这样的会话:session = SQLAlchemySessions().sessiongithub.com/mitsuhiko/flask-sqlalchemy/issues/… 那么我该如何定位呢? @Devy 在您的特定情况下,您可以附加到所有db.s_* 实例,因为它们是scoped_sessions;这将影响db.s_* 代理创建的所有会话。或者,您可以附加到全局 sqlalchemy.orm.session.Session 类,这将影响 所有 个会话。

以上是关于属性持久化的 SQLAlchemy ORM 事件挂钩的主要内容,如果未能解决你的问题,请参考以下文章

Python SQLalchemy的学习与使用

ORM框架之SQLAlchemy

ORM框架SQLAlchemy的使用

ORM框架SQLAlchemy学习(未整理完)

Django ORM和SQLAlchemy类比

插入到自动映射生成的 ORM 时出现 SQLAlchemy InvalidRequestError