将数据从 sqlalchemy 移动到 pandas DataFrame

Posted

技术标签:

【中文标题】将数据从 sqlalchemy 移动到 pandas DataFrame【英文标题】:Moving data from sqlalchemy to a pandas DataFrame 【发布时间】:2018-08-19 06:49:57 【问题描述】:

我正在尝试在 pandas DataFrame 中加载 SQLAlchemy。

当我这样做时:

df = pd.DataFrame(LPRRank.query.all())

我明白了

>>> df
0        <M. Misty || 1 || 18>
1        <P. Patch || 2 || 18>
...
...

但是,我希望数据库中的每一列都成为数据框中的一列:

0        M. Misty  1  18
1        P. Patch  2  18
...
...

当我尝试时:

dff = pd.read_sql_query(LPRRank.query.all(), db.session())

我得到一个属性错误:

AttributeError: 'SignallingSession' object has no attribute 'cursor'

dff = pd.read_sql_query(LPRRank.query.all(), db.session)

也报错:

AttributeError: 'scoped_session' object has no attribute 'cursor'

我用来生成对象列表的是:

app = Flask(__name__)
db = SQLAlchemy(app)

class LPRRank(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    candid = db.Column(db.String(40), index=True, unique=False)
    rank = db.Column(db.Integer, index=True, unique=False) 
    user_id = db.Column(db.Integer, db.ForeignKey('lprvote.id'))

    def __repr__(self):
        return '< ||  || >'.format(self.candid,
                                                 self.rank, self.user_id) 

这个问题: How to convert SQL Query result to PANDAS Data Structure? 没有错误,但将每一行作为一个对象,这不是我想要的。我可以访问返回对象中的各个列,但似乎有更好的方法。

如果您已经了解正在发生的事情并且只需要查看语法,那么 pandas.pydata.org 上的文档非常棒。 2016 年 4 月 20 日的文档(1319 页 pdf)将 pandas 连接标识为在 p.872 上仍处于试验阶段。

现在,SQLALCHEMY/PANDAS - SQLAlchemy reading column as CLOB for Pandas to_sql 是关于指定 SQL 类型的。我的是默认的 SQLAlchemy。

而且,sqlalchemy pandas to_sql OperationalError、Writing to mysql database with pandas using SQLAlchemy, to_sql 和 SQLAlchemy/pandas to_sql for SQLServer -- CREATE TABLE in master db 是关于写入 SQL 数据库的,这会产生操作错误、数据库错误和“创建表”错误,这些都不是我的问题。

这个SQLAlchemy Pandas read_sql from jsonb 想要一个jsonb 属性到列:不是我的杯子'o 茶。

上一个问题SQLAlchemy ORM conversion to pandas DataFrame 解决了我的问题,但解决方案:使用query.session.bind 不是我的解决方案。我正在使用 db.session.add() 和 db.session.commit() 打开/关闭会话,但是当我使用此处第二个答案中指定的 db.session.bind 时,我得到一个属性错误:

AttributeError: 'list' object has no attribute '_execute_on_connection'

【问题讨论】:

【参考方案1】:

只需在模型中添加一个__init__ 方法并在构建数据框之前调用 Class 对象。具体来说,下面会创建一个可迭代的元组,这些元组使用pandas.DataFrame() 绑定到列中。

class LPRRank(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    candid = db.Column(db.String(40), index=True, unique=False)
    rank = db.Column(db.Integer, index=True, unique=False) 
    user_id = db.Column(db.Integer, db.ForeignKey('lprvote.id'))

    def __init__(self, candid=None, rank=None, user_id=None):
        self.data = (candid, rank, user_id)

    def __repr__(self):
        return (self.candid, self.rank, self.user_id) 

data = db.session.query(LPRRank).all()
df = pd.DataFrame([(d.candid, d.rank, d.user_id) for d in data], 
                  columns=['candid', 'rank', 'user_id'])

或者,使用基于您定义的模型类的 SQLAlchemy ORM,LPRRank,运行read_sql

df = pd.read_sql(sql = db.session.query(LPRRank)\
                         .with_entities(LPRRank.candid,
                                        LPRRank.rank,
                                        LPRRank.user_id).statement, 
                 con = db.session.bind)

【讨论】:

感谢您指出格式。我直接从 Miguel Grinberg 的 Flask 教程中获取了它,这非常好。他的格式反映了 SQLAlchemy 文档,这与我在df = pd.DataFrame(LPRRank.query.all(), columns=['candid', 'rank', 'user_id']) 得到的错误一致: ...错误是ValueError: Shape of passed values is (1, 113), indices imply (3, 113)。它正在寻找一个字符串,但得到一个元组。关于第二种解决方案,我得到一个:sqlalchemy.exc.ObjectNotExecutableError: Not an executable object: &lt;flask_sqlalchemy.BaseQuery object at 0x1134dc860&gt; 在上面的调用中查看使用__init__ 的更新并将.statement 添加到下面调用的 sql arg 中。两者都经过测试,没有错误。 它们都有效。谢谢!我特别喜欢能够在不循环的情况下做到这一点,即使在列表理解中也是如此。但是,read_sql 包含 SQLAlchemy ID,从而创建了 DataFrame 索引的冗余。我可以通过操作DataFrame来消除冗余,但是可以在read_sql命令中保留它吗? 太棒了!实际上,您可以使用with_entities 选择特定列。见更新。或者使用read_sql 中的index_col='id' arg 选择主键字段作为索引。【参考方案2】:

Parfait 的答案很好,但可能会遇到问题:

    每个对象创建的效率意味着将数据复制到 DataFrame 中,因此创建 DataFrame 列表可能需要一些时间 不镜像包含行集合的数据框

因此,下面的示例提供了一个 parent 类,该类被同化为 DataFrame 表示形式和一个 child 类同化到给定数据帧的 row

下面的代码提供了两种获取dataframe的方法,DataFrame对象只在需要时创建,不浪费cpu和内存。

如果在创建时需要数据框,您只需添加构造函数 (def __init__(self, rows:List[MyDataFrameRow] = None)...) 并创建一个新属性并评估 self.data_frame 的结果。

from pandas import DataFrame, read_sql
from sqlalchemy import Column, Integer, String, Float, ForeignKey
from sqlalchemy.orm import relationship, Session

Base = declarative_base()

class MyDataFrame(Base):
    __tablename__ = 'my_data_frame'
    id = Column(Integer, primary_key=True)
    rows = relationship('MyDataFrameRow', cascade='all,delete')

    @property
    def data_frame(self) -> DataFrame:
        columns = GenomeCoverageRow.data_frame_columns()
        return DataFrame([[getattr(row, column) for column in columns] for row in self.rows],
                         columns=columns)

    @staticmethod
    def to_data_frame(identifier: int, session: Session) -> DataFrame:
        query = session.query(MyDataFrameRow).join(MyDataFrame).filter(MyDataFrame.id == identifier)
        return read_sql(query.statement, session.get_bind())


class MyDataFrameRow(Base):

    __tablename__ = 'my_data_row'
    id = Column(Integer, primary_key=True)
    name= Column(String)
    age= Column(Integer)
    number_of_children = Column(Integer)
    height= Column(Integer)
    parent_id = Column(Integer, ForeignKey('my_data_frame.id'))

    @staticmethod
    def data_frame_columns() -> Tuple[Any]:
        return tuple(column.name for column in GenomeCoverageRow.__table__.columns if len(column.foreign_keys) == 0
                     and column.primary_key is False)
...
session = Session(...)
df1 = MyDataFrame.to_data_frame(1,session)
my_table_obj = session.query(MyDataFrame).filter(MyDataFrame.id == 1).one()
df2 = my_table_obj.data_frame

【讨论】:

以上是关于将数据从 sqlalchemy 移动到 pandas DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

将 pandas 数据框行绑定到 sqlAlchemy 自定义查询

使用 sqlalchemy 从 PostgreSQL 查询返回 Pandas 数据框

想要使用 pandas 和 sqlalchemy 从查询是变量(不和谐用户名)的数据库中选择所有内容

使用 SQLAlchemy 和 Pandas 插入数据 - Python

将pandas的DataFrame数据写入MySQL数据库 + sqlalchemy

无法在 ETL 过程中使用 Pandas 和 SQLAlchemy 将列名从 CSV 更改为 SQL Server DB