在 SQLAlchemy 列类型和 python 数据类型之间轻松转换?

Posted

技术标签:

【中文标题】在 SQLAlchemy 列类型和 python 数据类型之间轻松转换?【英文标题】:Easy convert betwen SQLAlchemy column types and python data types? 【发布时间】:2010-11-12 13:35:17 【问题描述】:

我正在寻找一种简单的 python 方法来比较从 SQLAlchemy 到基本类型的列类型。例如,如果我的列类型是任意长度的 VARCHAR,我想将其读取为字符串。

我可以读取列类型,但我不确定验证它是基本类型的简单方法......如果我可以使用类似“if isinstance(mycolumn, int)”之类的东西会很好 - 但我我是 python 新手,不知道它是如何工作的。

这是我目前所拥有的:

from sqlalchemy import MetaData
from sqlalchemy import create_engine, Column, Table
engine = create_engine('mysql+mysqldb://user:pass@localhost:3306/mydb', pool_recycle=3600)
meta = MetaData()
meta.bind = engine
meta.reflect()
datatable = meta.tables['my_data_table']
[c.type for c in datatable.columns]

输出:

[INTEGER(display_width=11), DATE(), VARCHAR(length=127), DOUBLE(precision=None, scale=None, asdecimal=True)]

我的最终目的是双重的,首先是因为我想在将输出加载到我的 jQuery jqGrid 时根据类型格式化输出。第二,我正在慢慢地将非规范化数据表转换为规范化结构,并希望确保我的类型保持一致 - (以确保我在前一个表中的数字存储为数字而不是字符串...... )

【问题讨论】:

【参考方案1】:

只需使用所有 AQLAlchemy 类型中可用的 python_type 属性:

[c.type.python_type for c in datatable.columns]

【讨论】:

【参考方案2】:

Python 类型到 SQL 类型:

我在使用默认 sql-types 即时创建 SQL 表的问题上苦苦挣扎。我最终得到了以下方便的函数,以满足我所有的 python 类型到 sql 类型的转换需求。从 sql 类型到 python 类型是微不足道的,这将在下一节中解释。

import sqlalchemy
import numpy as np

import datetime
import decimal

_type_py2sql_dict = 
 int: sqlalchemy.sql.sqltypes.BigInteger,
 str: sqlalchemy.sql.sqltypes.Unicode,
 float: sqlalchemy.sql.sqltypes.Float,
 decimal.Decimal: sqlalchemy.sql.sqltypes.Numeric,
 datetime.datetime: sqlalchemy.sql.sqltypes.DateTime,
 bytes: sqlalchemy.sql.sqltypes.LargeBinary,
 bool: sqlalchemy.sql.sqltypes.Boolean,
 datetime.date: sqlalchemy.sql.sqltypes.Date,
 datetime.time: sqlalchemy.sql.sqltypes.Time,
 datetime.timedelta: sqlalchemy.sql.sqltypes.Interval,
 list: sqlalchemy.sql.sqltypes.ARRAY,
 dict: sqlalchemy.sql.sqltypes.JSON


def type_py2sql(pytype):
    '''Return the closest sql type for a given python type'''
    if pytype in _type_py2sql_dict:
        return _type_py2sql_dict[pytype]
    else:
        raise NotImplementedError(
            "You may add custom `sqltype` to `"+str(pytype)+"` assignment in `_type_py2sql_dict`.")

def type_np2py(dtype=None, arr=None):
    '''Return the closest python type for a given numpy dtype'''

    if ((dtype is None and arr is None) or
        (dtype is not None and arr is not None)):
        raise ValueError(
            "Provide either keyword argument `dtype` or `arr`: a numpy dtype or a numpy array.")

    if dtype is None:
        dtype = arr.dtype

    #1) Make a single-entry numpy array of the same dtype
    #2) force the array into a python 'object' dtype
    #3) the array entry should now be the closest python type
    single_entry = np.empty([1], dtype=dtype).astype(object)

    return type(single_entry[0])

def type_np2sql(dtype=None, arr=None):
    '''Return the closest sql type for a given numpy dtype'''
    return type_py2sql(type_np2py(dtype=dtype, arr=arr))

一些用例:

>>> sqlalchemy.Column(type_py2sql(int))
Column(None, BigInteger(), table=None)

>>> type_py2sql(type('hello'))
sqlalchemy.sql.sqltypes.Unicode

>>> type_np2sql(arr=np.array([1.,2.,3.]))
sqlalchemy.sql.sqltypes.Float

我如何选择转换集:

我所做的是将所有 sql 类型映射到它们等效的 python 类型。然后我打印出哪个 python 类型对应于哪个 sql-types,并为每种 python 类型选择了最好的 sql-type。这是我用来生成此映射的代码:

#********** SQL to Python: one to one **********
type_sql2py_dict = 
for key in sqlalchemy.types.__dict__['__all__']:
    sqltype = getattr(sqlalchemy.types, key)

    if 'python_type' in dir(sqltype) and not sqltype.__name__.startswith('Type'):
        try:
            typeinst = sqltype()
        except TypeError as e: #List/array wants inner-type
            typeinst = sqltype(None)

        try:
            type_sql2py_dict[sqltype] = typeinst.python_type
        except NotImplementedError:
            pass

#********** Python to SQL: one to many **********
type_py2sql_dict = 
for key, val in type_sql2py_dict.items():
    if not val in type_py2sql_dict:
        type_py2sql_dict[val] = [key]
    else:
        type_py2sql_dict[val].append(key)

这是type_py2sql_dict在sqlalchemy 1.3.5版本下的输出:

int: [sqlalchemy.sql.sqltypes.INTEGER,
  sqlalchemy.sql.sqltypes.BIGINT,
  sqlalchemy.sql.sqltypes.SMALLINT,
  sqlalchemy.sql.sqltypes.Integer,
  sqlalchemy.sql.sqltypes.SmallInteger,
  sqlalchemy.sql.sqltypes.BigInteger],
 str: [sqlalchemy.sql.sqltypes.CHAR,
  sqlalchemy.sql.sqltypes.VARCHAR,
  sqlalchemy.sql.sqltypes.NCHAR,
  sqlalchemy.sql.sqltypes.NVARCHAR,
  sqlalchemy.sql.sqltypes.TEXT,
  sqlalchemy.sql.sqltypes.Text,
  sqlalchemy.sql.sqltypes.CLOB,
  sqlalchemy.sql.sqltypes.String,
  sqlalchemy.sql.sqltypes.Unicode,
  sqlalchemy.sql.sqltypes.UnicodeText,
  sqlalchemy.sql.sqltypes.Enum],
 float: [sqlalchemy.sql.sqltypes.FLOAT,
  sqlalchemy.sql.sqltypes.REAL,
  sqlalchemy.sql.sqltypes.Float],
 decimal.Decimal: [sqlalchemy.sql.sqltypes.NUMERIC,
  sqlalchemy.sql.sqltypes.DECIMAL,
  sqlalchemy.sql.sqltypes.Numeric],
 datetime.datetime: [sqlalchemy.sql.sqltypes.TIMESTAMP,
  sqlalchemy.sql.sqltypes.DATETIME,
  sqlalchemy.sql.sqltypes.DateTime],
 bytes: [sqlalchemy.sql.sqltypes.BLOB,
  sqlalchemy.sql.sqltypes.BINARY,
  sqlalchemy.sql.sqltypes.VARBINARY,
  sqlalchemy.sql.sqltypes.LargeBinary,
  sqlalchemy.sql.sqltypes.Binary],
 bool: [sqlalchemy.sql.sqltypes.BOOLEAN, sqlalchemy.sql.sqltypes.Boolean],
 datetime.date: [sqlalchemy.sql.sqltypes.DATE, sqlalchemy.sql.sqltypes.Date],
 datetime.time: [sqlalchemy.sql.sqltypes.TIME, sqlalchemy.sql.sqltypes.Time],
 datetime.timedelta: [sqlalchemy.sql.sqltypes.Interval],
 list: [sqlalchemy.sql.sqltypes.ARRAY],
 dict: [sqlalchemy.sql.sqltypes.JSON]

【讨论】:

【参考方案3】:

一种解决方案是手动进行转换 - 例如,这样可以:

def convert(self, saType):
    type = "Unknown"
    if isinstance(saType,sqlalchemy.types.INTEGER):
        type = "Integer"
    elif isinstance(saType,sqlalchemy.types.VARCHAR):
        type = "String"
    elif isinstance(saType,sqlalchemy.types.DATE):
        type = "Date"
    elif isinstance(saType,sqlalchemy.dialects.mysql.base._FloatType):
        type = "Double"
    return type

不确定这是否是正常的 python 做事方式...我仍然像 java 程序员一样思考。

【讨论】:

【参考方案4】:

你可以做一个 str(column.type) 这会给你类型作为一个字符串。 在你的代码中

    from sqlalchemy import MetaData
    from sqlalchemy import create_engine, Column, Table
    engine = create_engine('mysql+mysqldb://user:pass@localhost:3306/mydb', pool_recycle=3600)
    meta = MetaData()
    meta.bind = engine
    meta.reflect()
    datatable = meta.tables['my_data_table']
    [str(c.type) for c in datatable.columns]

你会得到一个包含数据类型的列表。希望这对你有帮助

【讨论】:

以上是关于在 SQLAlchemy 列类型和 python 数据类型之间轻松转换?的主要内容,如果未能解决你的问题,请参考以下文章

SQLAlchemy(常用的SQLAlchemy列选项)

sqlalchemy基本类型

Postgres中'money'和'OID'的sqlalchemy等效列类型是啥?

在 postgres 中使用 sqlalchemy 访问复合数据类型

SQLalchemy 字段类型

基础入门_Python-模块和包.深入SQLAlchemy之列级别约束与表级别约束?