如何将 SqlAlchemy 结果序列化为 JSON?

Posted

技术标签:

【中文标题】如何将 SqlAlchemy 结果序列化为 JSON?【英文标题】:How to serialize SqlAlchemy result to JSON? 【发布时间】:2011-06-28 16:02:24 【问题描述】:

Django 有一些很好的自动序列化从 DB 返回到 JSON 格式的 ORM 模型。

如何将 SQLAlchemy 查询结果序列化为 JSON 格式?

我试过jsonpickle.encode,但它对查询对象本身进行编码。 我试过json.dumps(items) 但它返回了

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

将 SQLAlchemy ORM 对象序列化为 JSON /XML 真的那么难吗?它没有任何默认的序列化程序吗?现在序列化 ORM 查询结果是很常见的任务。

我需要的只是返回 SQLAlchemy 查询结果的 JSON 或 XML 数据表示。

需要在javascript datagird中使用JSON/XML格式的SQLAlchemy对象查询结果(JQGrid http://www.trirand.com/blog/)

【问题讨论】:

这是一个适合我的解决方法。 enter link description here 我必须警告你,序列化许多 sqlalchemy 模型(例如它们的列表)会非常慢。如果您关心性能,请选择字典。 ***.com/a/58660606/2782670 在表模型上创建一个包装器并在其中定义 to_dict 方法并写入要序列化的列数据并使用此包装器从数据库中获取数据 【参考方案1】:

这不是那么直截了当。我写了一些代码来做到这一点。我仍在研究它,它使用 MochiKit 框架。它基本上使用代理和注册的 JSON 转换器在 Python 和 Javascript 之间转换复合对象。

数据库对象的浏览器端是db.js 它需要proxy.js中的基本Python代理源。

在 Python 端有基础proxy module。 最后是webserver.py 中的 SqlAlchemy 对象编码器。 它还依赖于models.py 文件中的元数据提取器。

【讨论】:

乍一看相当复杂...我需要的是获取 JSON/XML 格式的 SQLAlchemy 对象查询结果,以便在 javascript datagird (JQGrid trirand.com/blog) 中使用它 有时问题比你乍一看更复杂...这处理作为外键返回的对象,并试图避免深度嵌套关系发生的无限递归。但是,您可能会编写一些仅返回基本类型的自定义查询,并直接使用 simplejson 序列化这些查询。 好吧,也许我真的会使用 SQLAlchemy 来查询字典,并且会利用 ORM 的好处只执行保存/更新操作。【参考方案2】:

您可以像这样将 RowProxy 转换为字典:

 d = dict(row.items())

然后将其序列化为 JSON(您必须为 datetime 值之类的内容指定编码器) 如果您只想要一个记录(而不是相关记录的完整层次结构),这并不难。

json.dumps([(dict(row.items())) for row in rs])

【讨论】:

这适用于我的自定义 sql 查询,其中 db.engine.connect() 作为 con: rs = con.execute(sql) 这更简单且有效。这个答案和接受的答案有什么区别?【参考方案3】:

扁平化实现

你可以这样使用:

from sqlalchemy.ext.declarative import DeclarativeMeta

class AlchemyEncoder(json.JSONEncoder):

    def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # an SQLAlchemy class
            fields = 
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                data = obj.__getattribute__(field)
                try:
                    json.dumps(data) # this will fail on non-encodable values, like other classes
                    fields[field] = data
                except TypeError:
                    fields[field] = None
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)

然后使用以下方法转换为 JSON:

c = YourAlchemyClass()
print json.dumps(c, cls=AlchemyEncoder)

它将忽略不可编码的字段(将它们设置为“无”)。

它不会自动扩展关系(因为这可能导致自引用,并永远循环)。

递归的非循环实现

但是,如果您希望永远循环,您可以使用:

from sqlalchemy.ext.declarative import DeclarativeMeta

def new_alchemy_encoder():
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

                # an SQLAlchemy class
                fields = 
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    fields[field] = obj.__getattribute__(field)
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

然后使用以下代码对对象进行编码:

print json.dumps(e, cls=new_alchemy_encoder(), check_circular=False)

这将编码所有孩子,以及他们所有的孩子,以及他们所有的孩子......基本上,可能对您的整个数据库进行编码。当它到达之前编码的东西时,它会将其编码为“无”。

一种递归的、可能是循环的、选择性的实现

另一种可能更好的选择是能够指定要扩展的字段:

def new_alchemy_encoder(revisit_self = False, fields_to_expand = []):
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if revisit_self:
                    if obj in _visited_objs:
                        return None
                    _visited_objs.append(obj)

                # go through each field in this SQLalchemy class
                fields = 
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    val = obj.__getattribute__(field)

                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field] = None
                            continue

                    fields[field] = val
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

你现在可以调用它:

print json.dumps(e, cls=new_alchemy_encoder(False, ['parents']), check_circular=False)

例如,仅扩展名为“父母”的 SQLAlchemy 字段。

【讨论】:

这是一个很好的回应,但是当它与非平面方法建立关系时,我得到一个“无法编码“BaseQuery”,有什么想法吗? @SashaB 如何更精细地针对重复关系的情况进行定位?例如,如果我有online_orderaddress,它们都与user 有关系,但online_order 也与address 有关系。如果我想序列化所有这些,我必须在fields_to_expand 中包含address,但我不想冗余序列化address,因为它与useronline_order 都有关系。 @BenKilah 让我猜猜,您使用的是 Flask-SqlAlchemy,并且您的模型继承自 db.Model,而不是 Base。如果是这种情况,请修改for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:,使其显示为for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata' and not x.startswith('query')]:。请记住,此解决方案将阻止您与名称“查询”建立属性/关系 和我一样,但要复杂得多。 ***.com/questions/7102754/… 你可以使用我的解决方案github.com/n0nSmoker/SQLAlchemy-serializer【参考方案4】:

您可以将对象输出为字典:

class User:
   def as_dict(self):
       return c.name: getattr(self, c.name) for c in self.__table__.columns

然后你使用User.as_dict() 序列化你的对象。

如Convert sqlalchemy row object to python dict中所述

【讨论】:

@charlax,我如何修复 DateTime?通过使用这个我得到'datetime.datetime(2013, 3, 22, 16, 50, 11) is not JSON serializable'当我做json.dumps 这是JSONEncoder 对象的职责。您可以对它进行子类化,为某些对象定义自己的编码器,包括日期时间。请注意,例如 Flask 支持开箱即用的 JSON 格式日期时间编码(使用最新版本)。 如果您使用 sqlalchemy 的“声明性”方法,您可以将类似的内容添加到自定义基类 - 这非常方便,因为您可以在您拥有的任何 ORM 对象上调用 my_orm_object.toDict()。同样,您可以定义一个 .toJSON() 方法,该方法使用您的 toDict 方法和一个自定义编码器来处理日期、blob 等 也支持日期时间:return c.name: unicode(getattr(self, c.name)) for c in self.__table__.columns 对于 Python 3 用户,@Shoham 的回答需要稍作改动:return c.name: str(getattr(self, c.name)) for c in self.__table__.columns【参考方案5】:

我建议使用marshmallow。它允许您创建序列化程序来表示您的模型实例,并支持关系和嵌套对象。

这是他们文档中的一个截断示例。取ORM模型,Author

class Author(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    first = db.Column(db.String(80))
    last = db.Column(db.String(80))

该类的棉花糖模式是这样构造的:

class AuthorSchema(Schema):
    id = fields.Int(dump_only=True)
    first = fields.Str()
    last = fields.Str()
    formatted_name = fields.Method("format_name", dump_only=True)

    def format_name(self, author):
        return ", ".format(author.last, author.first)

...并像这样使用:

author_schema = AuthorSchema()
author_schema.dump(Author.query.first())

...会产生这样的输出:


        "first": "Tim",
        "formatted_name": "Peters, Tim",
        "id": 1,
        "last": "Peters"

看看他们的完整Flask-SQLAlchemy Example。

一个名为marshmallow-sqlalchemy 的库专门集成了SQLAlchemy 和marshmallow。在该库中,上述Author 模型的架构如下所示:

class AuthorSchema(ModelSchema):
    class Meta:
        model = Author

集成允许从 SQLAlchemy Column 类型推断字段类型。

marshmallow-sqlalchemy here.

【讨论】:

我还发现了marshmallow-sqlalchemy.readthedocs.io/en/latest,它简化了模式生成【参考方案6】:

Flask-JsonTools 包为您的模型实现了JsonSerializableBase 基类。

用法:

from sqlalchemy.ext.declarative import declarative_base
from flask.ext.jsontools import JsonSerializableBase

Base = declarative_base(cls=(JsonSerializableBase,))

class User(Base):
    #...

现在User 模型可以神奇地序列化了。

如果你的框架不是Flask,你可以grab the code

【讨论】:

这只解决了一半的问题,因为它只序列化了一行。如何序列化整个查询结果? @SteveBennett 使用 jsontools 的 jsonapi 对响应进行编码。这将自动编码返回对象 我有一个非常简单的 sqlalchemy 模型,我得到:TypeError: is not JSON serializable 它最终通过在我的模型对象上显式调用 __json__() 来工作:return my_object.__json__() 该库不适用于 Flask 1.0 及更高版本,因为 Flask 1.0 不再支持 import flask.ext.whatever【参考方案7】:

出于安全原因,您永远不应返回模型的所有字段。我更喜欢选择性地选择它们。

Flask 的 json 编码现在支持 UUID、日期时间和关系(并为 flask_sqlalchemy db.Model 类添加了 queryquery_class)。我已将编码器更新如下:

app/json_encoder.py

    from sqlalchemy.ext.declarative import DeclarativeMeta
    from flask import json


    class AlchemyEncoder(json.JSONEncoder):
        def default(self, o):
            if isinstance(o.__class__, DeclarativeMeta):
                data = 
                fields = o.__json__() if hasattr(o, '__json__') else dir(o)
                for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class']]:
                    value = o.__getattribute__(field)
                    try:
                        json.dumps(value)
                        data[field] = value
                    except TypeError:
                        data[field] = None
                return data
            return json.JSONEncoder.default(self, o)

app/__init__.py

# json encoding
from app.json_encoder import AlchemyEncoder
app.json_encoder = AlchemyEncoder

有了这个,我可以选择添加一个__json__ 属性,它返回我希望编码的字段列表:

app/models.py

class Queue(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    song_id = db.Column(db.Integer, db.ForeignKey('song.id'), unique=True, nullable=False)
    song = db.relationship('Song', lazy='joined')
    type = db.Column(db.String(20), server_default=u'audio/mpeg')
    src = db.Column(db.String(255), nullable=False)
    created_at = db.Column(db.DateTime, server_default=db.func.now())
    updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())

    def __init__(self, song):
        self.song = song
        self.src = song.full_path

    def __json__(self):
        return ['song', 'src', 'type', 'created_at']

我将@jsonapi 添加到我的视图中,返回结果列表,然后我的输出如下:

[



    "created_at": "Thu, 23 Jul 2015 11:36:53 GMT",
    "song": 

        
            "full_path": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
            "id": 2,
            "path_name": "Audioslave/Audioslave [2002]/1 Cochise.mp3"
        ,
    "src": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
    "type": "audio/mpeg"


]

【讨论】:

漂亮!再次证明,有时您不需要为每项愚蠢的小任务使用胖包——学习 DSL 可能比以“硬”方式完成它更难。在登陆这里之前,我查看了许多 JSON 和 REST 包。没错,这仍然需要一个包,flask_jsontools(在 views.py 等中将 @jsonapi 添加到 @app.route),但我喜欢它的简单性。我认为这是便宜的 Flask 添加了 datetime 但不是 date 所以我自己将它添加到 json_encoder.pyvalue=...^if isinstance(value, date):^data[field] = datetime.combine(value, time.min).isoformat()^else:^try:...【参考方案8】:

自定义序列化和反序列化。

“from_json”(类方法)基于json数据构建Model对象。

"deserialize" 只能在实例上调用,并将所有来自 json 的数据合并到 Model 实例中。

"serialize" - 递归序列化

__write_only__ 属性用于定义只写属性(例如“password_hash”)。

class Serializable(object):
    __exclude__ = ('id',)
    __include__ = ()
    __write_only__ = ()

    @classmethod
    def from_json(cls, json, selfObj=None):
        if selfObj is None:
            self = cls()
        else:
            self = selfObj
        exclude = (cls.__exclude__ or ()) + Serializable.__exclude__
        include = cls.__include__ or ()
        if json:
            for prop, value in json.iteritems():
                # ignore all non user data, e.g. only
                if (not (prop in exclude) | (prop in include)) and isinstance(
                        getattr(cls, prop, None), QueryableAttribute):
                    setattr(self, prop, value)
        return self

    def deserialize(self, json):
        if not json:
            return None
        return self.__class__.from_json(json, selfObj=self)

    @classmethod
    def serialize_list(cls, object_list=[]):
        output = []
        for li in object_list:
            if isinstance(li, Serializable):
                output.append(li.serialize())
            else:
                output.append(li)
        return output

    def serialize(self, **kwargs):

        # init write only props
        if len(getattr(self.__class__, '__write_only__', ())) == 0:
            self.__class__.__write_only__ = ()
        dictionary = 
        expand = kwargs.get('expand', ()) or ()
        prop = 'props'
        if expand:
            # expand all the fields
            for key in expand:
                getattr(self, key)
        iterable = self.__dict__.items()
        is_custom_property_set = False
        # include only properties passed as parameter
        if (prop in kwargs) and (kwargs.get(prop, None) is not None):
            is_custom_property_set = True
            iterable = kwargs.get(prop, None)
        # loop trough all accessible properties
        for key in iterable:
            accessor = key
            if isinstance(key, tuple):
                accessor = key[0]
            if not (accessor in self.__class__.__write_only__) and not accessor.startswith('_'):
                # force select from db to be able get relationships
                if is_custom_property_set:
                    getattr(self, accessor, None)
                if isinstance(self.__dict__.get(accessor), list):
                    dictionary[accessor] = self.__class__.serialize_list(object_list=self.__dict__.get(accessor))
                # check if those properties are read only
                elif isinstance(self.__dict__.get(accessor), Serializable):
                    dictionary[accessor] = self.__dict__.get(accessor).serialize()
                else:
                    dictionary[accessor] = self.__dict__.get(accessor)
        return dictionary

【讨论】:

【参考方案9】:
def alc2json(row):
    return dict([(col, str(getattr(row,col))) for col in row.__table__.columns.keys()])

我想我会用这个来打一点代码高尔夫。

仅供参考:我正在使用automap_base,因为我们有一个根据业务需求单独设计的架构。我今天刚开始使用 SQLAlchemy,但文档指出 automap_base 是 declarative_base 的扩展,这似乎是 SQLAlchemy ORM 中的典型范例,所以我相信这应该可行。

根据Tjorriemorrie 的解决方案,它并不喜欢跟随外键,但它只是将列与值匹配并通过 str()-ing 列值来处理 Python 类型。我们的值由 Python datetime.time 和 decimal.Decimal 类类型结果组成,因此它可以完成工作。

希望对路人有帮助!

【讨论】:

【参考方案10】:

我知道这是一篇相当老的帖子。我采用了@SashaB 给出的解决方案,并根据我的需要进行了修改。

我添加了以下内容:

    字段忽略列表:序列化时要忽略的字段列表 字段替换列表:包含在序列化时由值替换的字段名称的字典。 删除的方法和 BaseQuery 序列化

我的代码如下:

def alchemy_json_encoder(revisit_self = False, fields_to_expand = [], fields_to_ignore = [], fields_to_replace = ):
   """
   Serialize SQLAlchemy result into JSon
   :param revisit_self: True / False
   :param fields_to_expand: Fields which are to be expanded for including their children and all
   :param fields_to_ignore: Fields to be ignored while encoding
   :param fields_to_replace: Field keys to be replaced by values assigned in dictionary
   :return: Json serialized SQLAlchemy object
   """
   _visited_objs = []
   class AlchemyEncoder(json.JSONEncoder):
      def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # don't re-visit self
            if revisit_self:
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

            # go through each field in this SQLalchemy class
            fields = 
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata' and x not in fields_to_ignore]:
                val = obj.__getattribute__(field)
                # is this field method defination, or an SQLalchemy object
                if not hasattr(val, "__call__") and not isinstance(val, BaseQuery):
                    field_name = fields_to_replace[field] if field in fields_to_replace else field
                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or \
                            (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field_name] = None
                            continue

                    fields[field_name] = val
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)
   return AlchemyEncoder

希望它对某人有所帮助!

【讨论】:

【参考方案11】:

我使用(太多?)字典:

def serialize(_query):
    #d = dictionary written to per row
    #D = dictionary d is written to each time, then reset
    #Master = dictionary of dictionaries; the id Key (int, unique from database) 
    from D is used as the Key for the dictionary D entry in Master
    Master = 
    D = 
    x = 0
    for u in _query:
        d = u.__dict__
        D = 
        for n in d.keys():
           if n != '_sa_instance_state':
                    D[n] = d[n]
        x = d['id']
        Master[x] = D
    return Master

使用 flask(包括 jsonify)和 flask_sqlalchemy 运行以将输出打印为 JSON。

使用 jsonify(serialize()) 调用函数。

适用于我迄今为止尝试过的所有 SQLAlchemy 查询(运行 SQLite3)

【讨论】:

【参考方案12】:

在 SQLAlchemy 中使用built-in serializer:

from sqlalchemy.ext.serializer import loads, dumps
obj = MyAlchemyObject()
# serialize object
serialized_obj = dumps(obj)

# deserialize object
obj = loads(serialized_obj)

如果您在会话之间传输对象,请记住使用session.expunge(obj) 将对象从当前会话中分离出来。 要再次附加它,只需执行session.add(obj)

【讨论】:

漂亮,但不会转换为 JSON。 对于 JSON“序列化”,请查看 marshmallow-sqlalchemy。当您向客户公开对象时,绝对是最佳解决方案。 marshmallow-sqlalchemy.readthedocs.io 序列化模块只适用于查询结构。它不需要:用户定义类的实例。这些在典型情况下不包含对引擎、会话或表达式结构的引用,可以直接序列化。【参考方案13】:

这里有一个解决方案,可让您选择要包含在输出中的关系,其深度可以达到您想要的深度。 注意:这是一个完整的重写,将 dict/str 作为 arg 而不是列表。修复了一些东西..

def deep_dict(self, relations=):
    """Output a dict of an SA object recursing as deep as you want.

    Takes one argument, relations which is a dictionary of relations we'd
    like to pull out. The relations dict items can be a single relation
    name or deeper relation names connected by sub dicts

    Example:
        Say we have a Person object with a family relationship
            person.deep_dict(relations='family':None)
        Say the family object has homes as a relation then we can do
            person.deep_dict(relations='family':'homes':None)
            OR
            person.deep_dict(relations='family':'homes')
        Say homes has a relation like rooms you can do
            person.deep_dict(relations='family':'homes':'rooms')
            and so on...
    """
    mydict =  dict((c, str(a)) for c, a in
                    self.__dict__.items() if c != '_sa_instance_state')
    if not relations:
        # just return ourselves
        return mydict

    # otherwise we need to go deeper
    if not isinstance(relations, dict) and not isinstance(relations, str):
        raise Exception("relations should be a dict, it is of type ".format(type(relations)))

    # got here so check and handle if we were passed a dict
    if isinstance(relations, dict):
        # we were passed deeper info
        for left, right in relations.items():
            myrel = getattr(self, left)
            if isinstance(myrel, list):
                mydict[left] = [rel.deep_dict(relations=right) for rel in myrel]
            else:
                mydict[left] = myrel.deep_dict(relations=right)
    # if we get here check and handle if we were passed a string
    elif isinstance(relations, str):
        # passed a single item
        myrel = getattr(self, relations)
        left = relations
        if isinstance(myrel, list):
            mydict[left] = [rel.deep_dict(relations=None)
                                 for rel in myrel]
        else:
            mydict[left] = myrel.deep_dict(relations=None)

    return mydict

所以对于一个使用 person/family/homes/rooms 的例子...把它变成 json 你所需要的就是

json.dumps(person.deep_dict(relations='family':'homes':'rooms'))

【讨论】:

这很好,我认为只需放入您的基类,以便所有对象都拥有它。我会把 json 编码留给你... 请注意,此版本将获取所有列表关系,因此请谨慎提供与大量项目的关系...【参考方案14】:

你可以像这样使用 SqlAlchemy 的自省:

mysql = SQLAlchemy()
from sqlalchemy import inspect

class Contacts(mysql.Model):  
    __tablename__ = 'CONTACTS'
    id = mysql.Column(mysql.Integer, primary_key=True)
    first_name = mysql.Column(mysql.String(128), nullable=False)
    last_name = mysql.Column(mysql.String(128), nullable=False)
    phone = mysql.Column(mysql.String(128), nullable=False)
    email = mysql.Column(mysql.String(128), nullable=False)
    street = mysql.Column(mysql.String(128), nullable=False)
    zip_code = mysql.Column(mysql.String(128), nullable=False)
    city = mysql.Column(mysql.String(128), nullable=False)
    def toDict(self):
        return  c.key: getattr(self, c.key) for c in inspect(self).mapper.column_attrs 

@app.route('/contacts',methods=['GET'])
def getContacts():
    contacts = Contacts.query.all()
    contactsArr = []
    for contact in contacts:
        contactsArr.append(contact.toDict()) 
    return jsonify(contactsArr)

@app.route('/contacts/<int:id>',methods=['GET'])
def getContact(id):
    contact = Contacts.query.get(id)
    return jsonify(contact.toDict())

从这里的答案中获得灵感: Convert sqlalchemy row object to python dict

【讨论】:

在最小的解决方案方面,我将上面的 toDict() 与 tom 的 CustomJSONEncoder(注意稍作修改)结合起来,将日期时间转换为 ISO 格式。啊堆栈溢出的威力! ^^ 完全同意楼上说的!!如此简约而优雅。令人难以置信的工作!【参考方案15】:

在 Flask 下,它可以工作并处理数据时间字段,将类型为'time': datetime.datetime(2018, 3, 22, 15, 40) 的字段转换为"time": "2018-03-22 15:40:00":

obj = c.name: str(getattr(self, c.name)) for c in self.__table__.columns

# This to get the JSON body
return json.dumps(obj)

# Or this to get a response object
return jsonify(obj)

【讨论】:

【参考方案16】:

更详细的解释。 在您的模型中,添加:

def as_dict(self):
       return c.name: str(getattr(self, c.name)) for c in self.__table__.columns

str() 适用于 python 3,因此如果使用 python 2,请使用 unicode()。它应该有助于反序列化日期。如果不处理这些,您可以将其删除。

您现在可以像这样查询数据库

some_result = User.query.filter_by(id=current_user.id).first().as_dict()

需要First() 以避免奇怪的错误。 as_dict() 现在将反序列化结果。反序列化后准备转json

jsonify(some_result)

【讨论】:

【参考方案17】:

以下代码会将 sqlalchemy 结果序列化为 json。

import json
from collections import OrderedDict


def asdict(self):
    result = OrderedDict()
    for key in self.__mapper__.c.keys():
        if getattr(self, key) is not None:
            result[key] = str(getattr(self, key))
        else:
            result[key] = getattr(self, key)
    return result


def to_array(all_vendors):
    v = [ ven.asdict() for ven in all_vendors ]
    return json.dumps(v) 

打电话很有趣,

def all_products():
    all_products = Products.query.all()
    return to_array(all_products)

【讨论】:

【参考方案18】:

虽然最初的问题可以追溯到一段时间,但这里的答案数量(以及我自己的经验)表明这是一个不平凡的问题,有许多不同的方法,具有不同的复杂性和不同的权衡。

这就是我构建 SQLAthanor 库的原因,该库扩展了 SQLAlchemy 的声明式 ORM,并提供了您可能想要查看的可配置序列化/反序列化支持。

该库支持:

Python 2.7、3.4、3.5 和 3.6。 SQLAlchemy 0.9 及更高版本 序列化/反序列化到/从 JSON、CSV、YAML 和 Python dict 列/属性、关系、混合属性和关联代理的序列化/反序列化 启用和禁用特定格式和列/关系/属性的序列化(例如,您希望支持 inbound password 值,但永远不要包含 outbound 值) 序列化前和反序列化后值处理(用于验证或类型强制) 一种非常简单的语法,既符合 Python 风格,又与 SQLAlchemy 自己的方法无缝一致

您可以在此处查看(我希望!)综合文档:https://sqlathanor.readthedocs.io/en/latest

希望这会有所帮助!

【讨论】:

【参考方案19】:

带有 utf-8 的内置串行器扼流圈无法解码某些输入的无效起始字节。相反,我选择了:

def row_to_dict(row):
    temp = row.__dict__
    temp.pop('_sa_instance_state', None)
    return temp


def rows_to_list(rows):
    ret_rows = []
    for row in rows:
        ret_rows.append(row_to_dict(row))
    return ret_rows


@website_blueprint.route('/api/v1/some/endpoint', methods=['GET'])
def some_api():
    '''
    /some_endpoint
    '''
    rows = rows_to_list(SomeModel.query.all())
    response = app.response_class(
        response=jsonplus.dumps(rows),
        status=200,
        mimetype='application/json'
    )
    return response

【讨论】:

【参考方案20】:

AlchemyEncoder 非常棒,但有时会因 Decimal 值而失败。这是解决小数问题的改进编码器 -

class AlchemyEncoder(json.JSONEncoder):
# To serialize SQLalchemy objects 
def default(self, obj):
    if isinstance(obj.__class__, DeclarativeMeta):
        model_fields = 
        for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
            data = obj.__getattribute__(field)
            print data
            try:
                json.dumps(data)  # this will fail on non-encodable values, like other classes
                model_fields[field] = data
            except TypeError:
                model_fields[field] = None
        return model_fields
    if isinstance(obj, Decimal):
        return float(obj)
    return json.JSONEncoder.default(self, obj)

【讨论】:

【参考方案21】:

Python 3.7+ 和 Flask 1.1+ 可以使用内置的dataclasses 包

from dataclasses import dataclass
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy


app = Flask(__name__)
db = SQLAlchemy(app)


@dataclass
class User(db.Model):
  id: int
  email: str

  id = db.Column(db.Integer, primary_key=True, auto_increment=True)
  email = db.Column(db.String(200), unique=True)


@app.route('/users/')
def users():
  users = User.query.all()
  return jsonify(users)  


if __name__ == "__main__":
  users = User(email="user1@gmail.com"), User(email="user2@gmail.com")
  db.create_all()
  db.session.add_all(users)
  db.session.commit()
  app.run()

/users/ 路由现在将返回用户列表。

[
  "email": "user1@gmail.com", "id": 1,
  "email": "user2@gmail.com", "id": 2
]

自动序列化相关模型

@dataclass
class Account(db.Model):
  id: int
  users: User

  id = db.Column(db.Integer)
  users = db.relationship(User)  # User model would need a db.ForeignKey field

jsonify(account) 的回复是这样的。

  
   "id":1,
   "users":[  
        
         "email":"user1@gmail.com",
         "id":1
      ,
        
         "email":"user2@gmail.com",
         "id":2
      
   ]

覆盖默认的 JSON 编码器

from flask.json import JSONEncoder


class CustomJSONEncoder(JSONEncoder):
  "Add support for serializing timedeltas"

  def default(o):
    if type(o) == datetime.timedelta:
      return str(o)
    elif type(o) == datetime.datetime:
      return o.isoformat()
    else:
      return super().default(o)

app.json_encoder = CustomJSONEncoder      

【讨论】:

这看起来很简单。它也适用于反序列化吗? 这是我一直在寻找的,而且,它似乎不适用于dump = json.dumps(query_result),但没关系,我使用return make_response(jsonify(query_result), 200) 请注意,id: int = Column 可以工作,但id = Column 不会,看来您必须为 json 声明静态类型才能序列化该字段,否则您会得到一个空的 对象. 这对我有用,为什么这不是公认的答案?我已经在 app_context 上玩了几个小时来让它与 Flask-Marshmallow 一起使用。 也为我工作。请注意,如果您使用的是 Python 3.6,则只需安装包:pipenv install dataclasses。然后它就可以正常工作了。【参考方案22】:

当使用 sqlalchemy 连接到数据库时,这是一个高度可配置的简单解决方案。使用熊猫。

import pandas as pd
import sqlalchemy

#sqlalchemy engine configuration
engine = sqlalchemy.create_engine....

def my_function():
  #read in from sql directly into a pandas dataframe
  #check the pandas documentation for additional config options
  sql_DF = pd.read_sql_table("table_name", con=engine)

  # "orient" is optional here but allows you to specify the json formatting you require
  sql_json = sql_DF.to_json(orient="index")

  return sql_json

【讨论】:

【参考方案23】:

也许你可以使用这样的类

from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy import Table


class Custom:
    """Some custom logic here!"""

    __table__: Table  # def for mypy

    @declared_attr
    def __tablename__(cls):  # pylint: disable=no-self-argument
        return cls.__name__  # pylint: disable= no-member

    def to_dict(self) -> Dict[str, Any]:
        """Serializes only column data."""
        return c.name: getattr(self, c.name) for c in self.__table__.columns

Base = declarative_base(cls=Custom)

class MyOwnTable(Base):
    #COLUMNS!

所有对象都有to_dict方法

【讨论】:

【参考方案24】:

在使用一些原始 sql 和未定义的对象时,使用 cursor.description 似乎得到了我想要的东西:

with connection.cursor() as cur:
    print(query)
    cur.execute(query)
    for item in cur.fetchall():
        row = column.name: item[i] for i, column in enumerate(cur.description)
        print(row)

【讨论】:

【参考方案25】:
step1:
class CNAME:
   ...
   def as_dict(self):
       return item.name: getattr(self, item.name) for item in self.__table__.columns

step2:
list = []
for data in session.query(CNAME).all():
    list.append(data.as_dict())

step3:
return jsonify(list)

【讨论】:

没有任何解释的代码转储很少有帮助。 Stack Overflow 是关于学习的,而不是提供 sn-ps 来盲目复制和粘贴。请edit您的问题并解释它如何比OP提供的更好。【参考方案26】:

虽然是老帖子了,也许我没有回答上面的问题,但是我想谈谈我的连载,至少它对我有用。

我使用 FastAPI、SqlAlchemy 和 MySQL,但我不使用 orm 模型;

# from sqlalchemy import create_engine
# from sqlalchemy.orm import sessionmaker
# engine = create_engine(config.SQLALCHEMY_DATABASE_URL, pool_pre_ping=True)
# SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

序列化代码



import decimal
import datetime


def alchemy_encoder(obj):
    """JSON encoder function for SQLAlchemy special classes."""
    if isinstance(obj, datetime.date):
        return obj.strftime("%Y-%m-%d %H:%M:%S")
    elif isinstance(obj, decimal.Decimal):
        return float(obj)

import json
from sqlalchemy import text

# db is SessionLocal() object 

app_sql = 'SELECT * FROM app_info ORDER BY app_id LIMIT :page,:page_size'

# The next two are the parameters passed in
page = 1
page_size = 10

# execute sql and return a <class 'sqlalchemy.engine.result.ResultProxy'> object
app_list = db.execute(text(app_sql), 'page': page, 'page_size': page_size)

# serialize
res = json.loads(json.dumps([dict(r) for r in app_list], default=alchemy_encoder))

如果不起作用,请忽略我的回答。我这里参考一下

https://codeandlife.com/2014/12/07/sqlalchemy-results-to-json-the-easy-way/

【讨论】:

【参考方案27】:

安装 simplejson pip install simplejson 和创建一个类

class Serialise(object):

    def _asdict(self):
        """
        Serialization logic for converting entities using flask's jsonify

        :return: An ordered dictionary
        :rtype: :class:`collections.OrderedDict`
        """

        result = OrderedDict()
        # Get the columns
        for key in self.__mapper__.c.keys():
            if isinstance(getattr(self, key), datetime):
                result["x"] = getattr(self, key).timestamp() * 1000
                result["timestamp"] = result["x"]
            else:
                result[key] = getattr(self, key)

        return result

并将这个类继承到每个 orm 类,以便这个 _asdict 函数注册到每个 ORM 类和繁荣。 并在任何地方使用 jsonify

【讨论】:

【参考方案28】:

(对Sasha B's 的微小调整非常出色的答案)

这专门将日期时间对象转换为字符串,在原始答案中将转换为None

# Standard library imports
from datetime import datetime
import json

# 3rd party imports
from sqlalchemy.ext.declarative import DeclarativeMeta

class JsonEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            dict = 

            # Remove invalid fields and just get the column attributes
            columns = [x for x in dir(obj) if not x.startswith("_") and x != "metadata"]

            for column in columns:
                value = obj.__getattribute__(column)

                try:
                    json.dumps(value)
                    dict[column] = value
                except TypeError:
                    if isinstance(value, datetime):
                        dict[column] = value.__str__()
                    else:
                        dict[column] = None
            return dict

        return json.JSONEncoder.default(self, obj)

【讨论】:

【参考方案29】:

这是一个JSONEncoder 版本,它保留模型列顺序并且只保留递归定义的列和关系字段。它还格式化大多数 JSON 不可序列化类型:

import json
from datetime import datetime
from decimal import Decimal

import arrow
from sqlalchemy.ext.declarative import DeclarativeMeta

class SQLAlchemyJSONEncoder(json.JSONEncoder):
    """
    SQLAlchemy ORM JSON Encoder
    If you have a "backref" relationship defined in your SQLAlchemy model,
    this encoder raises a ValueError to stop an infinite loop.
    """

    def default(self, obj):
        if isinstance(obj, datetime):
            return arrow.get(obj).isoformat()
        elif isinstance(obj, Decimal):
            return float(obj)
        elif isinstance(obj, set):
            return sorted(obj)
        elif isinstance(obj.__class__, DeclarativeMeta):
            for attribute, relationship in obj.__mapper__.relationships.items():
                if isinstance(relationship.__getattribute__("backref"), tuple):
                    raise ValueError(
                        f'obj.__class__ object has a "backref" relationship '
                        "that would cause an infinite loop!"
                    )
            dictionary = 
            column_names = [column.name for column in obj.__table__.columns]
            for key in column_names:
                value = obj.__getattribute__(key)
                if isinstance(value, datetime):
                    value = arrow.get(value).isoformat()
                elif isinstance(value, Decimal):
                    value = float(value)
                elif isinstance(value, set):
                    value = sorted(value)
                dictionary[key] = value
            for key in [
                attribute
                for attribute in dir(obj)
                if not attribute.startswith("_")
                and attribute != "metadata"
                and attribute not in column_names
            ]:
                value = obj.__getattribute__(key)
                dictionary[key] = value
            return dictionary

        return super().default(obj)

【讨论】:

【参考方案30】:
class SqlToDict:
    def __init__(self, data) -> None:
        self.data = data

    def to_timestamp(self, date):
        if isinstance(date, datetime):
            return int(datetime.timestamp(date))
        else:
            return date

    def to_dict(self) -> List:
        arr = []
        for i in self.data:
            keys = [*i.keys()]
            values = [*i]
            values = [self.to_timestamp(d) for d in values]
            arr.append(dict(zip(keys, values)))
        return arr

例如:

SqlToDict(data).to_dict()

【讨论】:

以上是关于如何将 SqlAlchemy 结果序列化为 JSON?的主要内容,如果未能解决你的问题,请参考以下文章

如何根据列的值过滤 SQLAlchemy 结果?

python 将SQLAlchemy Model序列化为字典(用于JSON输出)并从字典属性更新Model。

如何将 TimeSpan 序列化为 XML

Json序列化与反序列化

Json序列化与反序列化

使用 select_related 将查询中的数据结果序列化为 json