SQLAlchemy 事件可以用于更新非规范化数据缓存吗?

Posted

技术标签:

【中文标题】SQLAlchemy 事件可以用于更新非规范化数据缓存吗?【英文标题】:Can SQLAlchemy events be used to update a denormalized data cache? 【发布时间】:2012-11-21 13:05:55 【问题描述】:

出于性能原因,我有一个非规范化数据库,其中一些表包含从其他表中的许多行聚合而来的数据。我想使用SQLAlchemy events 来维护这个非规范化的数据缓存。例如,假设我正在编写论坛软件,并希望每个 Thread 都有一个列来跟踪线程中所有 cmets 的组合字数,以便有效地显示该信息:

class Thread(Base):
    id = Column(UUID, primary_key=True, default=uuid.uuid4)
    title = Column(UnicodeText(), nullable=False)
    word_count = Column(Integer, nullable=False, default=0)

class Comment(Base):
    id = Column(UUID, primary_key=True, default=uuid.uuid4)
    thread_id = Column(UUID, ForeignKey('thread.id', ondelete='CASCADE'), nullable=False)
    thread = relationship('Thread', backref='comments')
    message = Column(UnicodeText(), nullable=False)
    
    @property
    def word_count(self):
        return len(self.message.split())

所以每次插入评论时(为了简单起见,我们假设 cmets 永远不会被编辑或删除),我们想要更新关联的 Thread 对象上的 word_count 属性。所以我想做类似的事情

def after_insert(mapper, connection, target):
    thread = target.thread
    thread.word_count = sum(c.word_count for c in thread.comments)
    print("updated cached word count to", thread.word_count)

event.listen(Comment, "after_insert", after_insert)

因此,当我插入 Comment 时,我可以看到事件触发并看到它已正确计算字数,但该更改未保存到数据库中的 Thread 行。我在after_insert documentation 中没有看到关于更新其他表格的任何警告,但我确实在其他一些表格中看到了一些警告,例如after_delete。

那么有没有一种支持的方式来使用 SQLAlchemy 事件来做到这一点?我已经将 SQLAlchemy 事件用于许多其他事情,所以我想以这种方式做所有事情,而不必编写数据库触发器。

【问题讨论】:

我认为有一次我打算这样做,我必须创建一个辅助 SQLAlchemy 数据库连接并以这种方式创建新对象。它从未进入生产数据库,所以我不知道它是否有任何副作用或问题。不过你可以试试。 before_insert 和同时提交插入和更新怎么样? 好吧,这不是一个好主意。文档说处理程序不应更改会话的状态,包括向其中添加新对象,因此您可能应该使用多个会话,如 Rachel 所述。 【参考方案1】:

after_insert() 事件是执行此操作的一种方法,您可能会注意到它传递了一个 SQLAlchemy Connection 对象,而不是像其他刷新相关事件那样传递一个 Session。映射器级别的刷新事件通常用于在给定的Connection 上直接调用 SQL:

@event.listens_for(Comment, "after_insert")
def after_insert(mapper, connection, target):
    thread_table = Thread.__table__
    thread = target.thread
    connection.execute(
            thread_table.update().
             where(thread_table.c.id==thread.id).
             values(word_count=sum(c.word_count for c in thread.comments))
    )
    print "updated cached word count to", thread.word_count

这里值得注意的是,直接调用 UPDATE 语句也比在整个工作单元过程中再次运行该属性更改的性能要高得多。

然而,这里并不真正需要像 after_insert() 这样的事件,因为我们甚至在刷新发生之前就知道“word_count”的值。我们实际上知道它是因为 Comment 和 Thread 对象是相互关联的,我们也可以使用属性事件在内存中始终保持 Thread.word_count 完全新鲜:

def _word_count(msg):
    return len(msg.split())

@event.listens_for(Comment.message, "set")
def set(target, value, oldvalue, initiator):
    if target.thread is not None:
        target.thread.word_count += (_word_count(value) - _word_count(oldvalue))

@event.listens_for(Comment.thread, "set")
def set(target, value, oldvalue, initiator):
    # the new Thread, if any
    if value is not None:
        value.word_count += _word_count(target.message)

    # the old Thread, if any
    if oldvalue is not None:
        oldvalue.word_count -= _word_count(target.message)

此方法的最大优点是也无需遍历 thread.cmets,这对于未加载的集合意味着发出另一个 SELECT。

还有一种方法是在 before_flush() 中进行。下面是一个快速而肮脏的版本,可以对其进行细化以更仔细地分析发生了什么变化,以确定是否需要更新 word_count:

@event.listens_for(Session, "before_flush")
def before_flush(session, flush_context, instances):
    for obj in session.new | session.dirty:
        if isinstance(obj, Thread):
            obj.word_count = sum(c.word_count for c in obj.comments)
        elif isinstance(obj, Comment):
            obj.thread.word_count = sum(c.word_count for c in obj.comments)

我会使用属性事件方法,因为它是性能最高且最新的。

【讨论】:

在我看来after_insert 方法还具有防止竞争条件的优势。如果是这种情况,在数据始终保持一致对业务至关重要的情况下,我更喜欢他们将事件归因于事件是否正确? 如果您真的希望 UPDATE 是“原子的”,这样即使没有可序列化的隔离也不会发生竞争条件,那么您需要针对所有线程 cmets 上的子查询运行它,而不是在 thread.cmets 的内存中迭代,这可能不是完整的集合。 如何创建新连接。因为我在烧瓶中使用应用程序工厂? db = SQLAlchemy(), db.init_app(app)? 您不必创建新连接。连接对象作为第二个参数传递给after_insert 事件处理程序。【参考方案2】:

您可以使用 SQLAlchemy-Utils aggregated 列做到这一点:http://sqlalchemy-utils.readthedocs.org/en/latest/aggregates.html

【讨论】:

它现在已经转了一圈。该函数的文档将此堆栈溢出答案指定为灵感

以上是关于SQLAlchemy 事件可以用于更新非规范化数据缓存吗?的主要内容,如果未能解决你的问题,请参考以下文章

更新 Cassandra 中的非规范化数据

属性持久化的 SQLAlchemy ORM 事件挂钩

Logstash -> Elasticsearch - 更新非规范化数据

在 SQL Server 中自动更新冗余/非规范化数据

可以将 SQLAlchemy 配置为非阻塞吗?

更新Cassandra中的大型非规范化数据