如何将一个类映射到sqlalchemy orm中的多个数据库

Posted

技术标签:

【中文标题】如何将一个类映射到sqlalchemy orm中的多个数据库【英文标题】:How to map a Class to multiple databases in sqlalchemy orm 【发布时间】:2015-08-19 20:26:10 【问题描述】:

我通过 sqlalchemy 使用 sqlite 数据库作为我的应用程序文件。我有一个单独的配置文件。

有些类的信息保存在我的应用程序文件中,我想复制到我的配置文件中。问题是我会根据可用性从一个或另一个来源加载它。

我在文档中看到了这一点,但我认为它并不直接适用,因为辅助映射不会保留信息。此外,哪个是主要的概念也很模糊。两个数据库将携带相同的信息,但可能不在同一个版本上。 http://sqlalchemy.readthedocs.org/en/rel_1_0/orm/nonstandard_mappings.html#multiple-mappers-for-one-class

我会试着用一个例子来说明清楚:

我有一个代表多字段用户输入的 A 类。我将其保存在我的应用程序文件中。

我的应用程序文件文件上的A类B也是由A类的一个实例组成的。

来自 A 类的同一个实例可能会组成几个合适的 B 类实例。这些都存储在我的应用程序文件中。

我的问题是,在另一个会话中,我可能希望使用全新的配置文件重用该 A 类实例。我不能只在应用程序文件上使用它,因为如果它得到更新,它将与所有使用它的应用程序文件相关。

另一方面,它不能只在配置文件中,因为用户可能会与另一个用户共享他的应用程序文件,而后者可能没有合适的配置,必须手动进行。

我需要在两个地方都有它,能够在运行时选择哪个数据库作为源,并让所有更改同时在两个数据库上保持。

可以在sqlalchemy+sqlite中完成吗?这是个好主意吗?有没有经典的解决方案?

编辑: 我想我正在描述一些看起来像缓存的东西,而 sqlalchemy 没有这样做。想到其他方法了吗? sqlalchemy 是否允许我在创建实例时将实例映射到数据库?这将允许同一类的两个实例映射到不同的数据库。然后我会通过 sqlalchemy 监听更新事件并将相同的 sql 发送到另一个数据库。我也不知道该怎么做。

另一种选择:将我的班级映射到联合查询。 Sqlalchemy 可能允许任意选择,但是存在持久性问题。

另一种选择:向引擎添加一个层,以便它同时连接到两个数据库,向两者发出相同的命令进行读取和写入。我可以处理重复的退货。

【问题讨论】:

【参考方案1】:

我想出了下面的 mixin。我不处理删除或回滚,因为我不在我的应用程序中使用它们,也不知道如何处理它们。 看起来它正在工作。我将继续扩展它以处理集合。

import os
from sqlalchemy import Column, Float, String, Enum, Integer, event
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import orm
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

class ReplicateMixin:
    @classmethod
    def get_or_create(cls,prime_session, sessoes = None, **kwargs):
        if sessoes is None:
            sessoes = []
        if not isinstance(sessoes, list):
            sessoes = [sessoes]
        sessoes = [prime_session] + sessoes #They are passed separatelly just to make explicit that the first might receive diferent treatment
        replicas = []

        for sessao in sessoes: #Gets a result or creates a new instance from each database
            instance = sessao.query(Datum).filter_by(**kwargs).first()
            if instance is None: 
                instance = cls(**kwargs)
                setattr(instance, "__new", True)
            sessao.add(instance) 
            instance.sessao = sessao
            replicas.append(instance)

        fittest = cls.__select_fittest(replicas) #Selects the instance whose data will prevail 
        prime = replicas.pop(0) #Instance from the session we will be issuing commits to. The others must simply follow.
        cls.__copy_data(fittest, prime, ReplicateMixin.__get_primary_keys(prime))

        setattr(prime, "__replicas", replicas) #The object will carry references to its copies
        return prime

    @staticmethod
    def __select_fittest(instances):
        """This method should contain logic for choosing the instance that has
        the most relevant information. It may be altered by child classes"""
        if getattr(instances[0], "__new", False):
            return instances[1]
        else:
            return instances[0] 

    @staticmethod
    def __copy_data(source, dest, primary_keys = None):
        primary_keys = [] if primary_keys is None else primary_keys
        for prop in orm.class_mapper(type(source)).iterate_properties:
            if (isinstance(prop, orm.ColumnProperty)
                and prop.key not in primary_keys):
                setattr(dest, prop.key,
                        getattr(source, prop.key))

    @staticmethod
    def __replicate(mapper, connection, original_obj):
        replicants = getattr(original_obj, "__replicas", []) #if it IS a replicant it will not have a __replicas attribute
        primary_keys = ReplicateMixin.__get_primary_keys(original_obj)
        for objeto in replicants:
            ReplicateMixin.__copy_data(original_obj, objeto, primary_keys)
            objeto.sessao.commit()

    @staticmethod
    def __replicate_del(mapper, conection, original_obj):
        replicants = getattr(original_obj, "__replicas", []) #if it IS a replicant it will not have a __replicas attribute
        for objeto in replicants:
            if objeto in objeto.sessao.new:
                objeto.sessao.expunge(objeto)
            else:
                objeto.sessao.delete(objeto)
            objeto.sessao.commit()

    @staticmethod
    def __get_primary_keys(mapped_object):
        return [key.name for key in orm.class_mapper(type(mapped_object)).primary_key]

    @classmethod
    def __declare_last__(cls):
        """Binds certain events to functions"""
        event.listen(cls, "before_insert", cls.__replicate)
        event.listen(cls, "before_update", cls.__replicate)
        event.listen(cls, "before_delete", cls.__replicate_del)
        #FIXME might not play well with rollback  

例子:

DeclarativeBase = declarative_base()
class Datum (ReplicateMixin, DeclarativeBase):
    __tablename__ = "xUnitTestData"
    Key = Column(Integer, primary_key=True)
    Value = Column(Float)
    nome = Column(String(10))
    def __repr__(self):
        return "; ; ".format(self.Key, self.Value, self.nome)




end_local = os.path.join(os.path.expanduser("~"), "Desktop", "local.bd")
end_remoto = os.path.join(os.path.expanduser("~"), "Desktop", "remoto.bd")
src_engine = create_engine('sqlite:///'+end_local, echo=False)
dst_engine = create_engine('sqlite:///'+end_remoto, echo=False)
DeclarativeBase.metadata.create_all(src_engine)
DeclarativeBase.metadata.create_all(dst_engine)
SessionSRC = sessionmaker(bind=src_engine)
SessionDST = sessionmaker(bind=dst_engine)


session1 = SessionSRC()
session2 = SessionDST()

item = Datum.pegar_ou_criar(session1, session2, Value = 0.5, nome = "terceiro")

item.Value = item.Value/2
print(item)
session1.delete(item)

session1.commit()
session1.close()

【讨论】:

以上是关于如何将一个类映射到sqlalchemy orm中的多个数据库的主要内容,如果未能解决你的问题,请参考以下文章

SQLAlchemy(一):SQLAlchemy去连接数据库ORM介绍将ORM模型映射到数据库中

ORM框架SQLAlchemy的使用

SQLAlchemy

SQLAlchemy的基础使用

SQLAlchemy框架用法详解

sqlalchemy和flask-sqlalchemy