如何分析 SQLAlchemy 支持的应用程序?

Posted

技术标签:

【中文标题】如何分析 SQLAlchemy 支持的应用程序?【英文标题】:How can I profile a SQLAlchemy powered application? 【发布时间】:2010-11-13 08:35:54 【问题描述】:

有人有分析 Python/SQLAlchemy 应用程序的经验吗?找出瓶颈和设计缺陷的最佳方法是什么?

我们有一个 Python 应用程序,其中数据库层由 SQLAlchemy 处理。该应用程序使用批处理设计,因此许多数据库请求是在有限的时间内按顺序完成的。目前运行时间有点太长,因此需要进行一些优化。我们不使用 ORM 功能,数据库是 PostgreSQL。

【问题讨论】:

【参考方案1】:

如果您只想分析查询时间,您可以使用上下文管理器记录在特定上下文中执行的所有查询:

"""SQLAlchemy Query profiler and logger."""
import logging
import time
import traceback

import sqlalchemy


class QueryProfiler:
    """Log query duration and SQL as a context manager."""

    def __init__(self,
                engine: sqlalchemy.engine.Engine,
                logger: logging.Logger,
                path: str):
        """
        Initialize for an engine and logger and filepath.
        engine: The sqlalchemy engine for which events should be logged.
                You can pass the class `sqlalchemy.engine.Engine` to capture all engines
        logger: The logger that should capture the query
        path: Only log the stacktrace for files in this path, use `'/'` to log all files
        """
        self.engine = engine
        self.logger = logger
        self.path = path

    def _before_cursor_execute(self, conn, cursor, statement, parameters, context, executemany):
        """Set the time on the connection to measure query duration."""
        conn._sqla_query_start_time = time.time()

    def _after_cursor_execute(self, conn, cursor, statement, parameters, context, executemany):
        """Listen for the 'after_cursor_execute' event and log sqlstatement and time."""
        end_time = time.time()
        start_time = getattr(conn, '_sqla_query_start_time', end_time)
        elapsed_time = round((end_time-start_time) * 1000)
        # only include the files in self.path in the stacktrace to reduce the noise
        stack = [frame for frame in traceback.extract_stack()[:-1] if frame.filename.startswith(self.path)]
        self.logger.debug('Query `%s` took %s ms. Stack: %s', statement, elapsed_time, traceback.format_list(stack))

    def __enter__(self, *args, **kwargs):
        """Context manager."""
        if isinstance(self.engine, sqlalchemy.engine.Engine):
            sqlalchemy.event.listen(self.engine, "before_cursor_execute", self._before_cursor_execute)
            sqlalchemy.event.listen(self.engine, "after_cursor_execute", self._after_cursor_execute)
        return self

    def __exit__(self, *args, **kwargs) -> None:
        """Context manager."""
        if isinstance(self.engine, sqlalchemy.engine.Engine):
            sqlalchemy.event.remove(self.engine, "before_cursor_execute", self._before_cursor_execute)
            sqlalchemy.event.remove(self.engine, "after_cursor_execute", self._after_cursor_execute)

使用与测试:

"""Test SQLAlchemy Query profiler and logger."""
import logging
import os

import sqlalchemy

from .sqlaprofiler import QueryProfiler

def test_sqlite_query(caplog):
    """Create logger and sqllite engine and profile the queries."""
    logging.basicConfig()
    logger = logging.getLogger(f'__name__')
    logger.setLevel(logging.DEBUG)
    caplog.set_level(logging.DEBUG, logger=f'__name__')
    path = os.path.dirname(os.path.realpath(__file__))
    engine = sqlalchemy.create_engine('sqlite://')
    metadata = sqlalchemy.MetaData(engine)
    table1 = sqlalchemy.Table(
            "sometable", metadata,
            sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
            sqlalchemy.Column("data", sqlalchemy.String(255), nullable=False),
        )
    conn = engine.connect()
    metadata.create_all(conn)

    with QueryProfiler(engine, logger, path):
        conn.execute(
            table1.insert(),
            ["data": f"entry i" for i in range(100000)]
        )

        conn.execute(
            table1.select()
            .where(table1.c.data.between("entry 25", "entry 7800"))
            .order_by(sqlalchemy.desc(table1.c.data))
        )

    assert caplog.messages[0].startswith('Query `INSERT INTO sometable (data) VALUES (?)` took')
    assert caplog.messages[1].startswith('Query `SELECT sometable.id, sometable.data \n'
                                        'FROM sometable \n'
                                        'WHERE sometable.data BETWEEN ? AND ? '
                                        'ORDER BY sometable.data DESC` took ')

【讨论】:

【参考方案2】:

我刚刚发现了库 sqltap (https://github.com/inconshreveable/sqltap)。它生成样式精美的 html 页面,有助于检查和分析 SQLAlchemy 生成的 SQL 查询。

示例用法:

profiler = sqltap.start()
run_some_queries()
statistics = profiler.collect()
sqltap.report(statistics, "report.html")

这个库已经 2 年没有更新了,但是,当我今天早些时候用我的应用程序测试它时,它似乎工作得很好。

【讨论】:

【参考方案3】:

SQLAlchemy wiki 上有一个非常有用的分析方法

稍作修改,

from sqlalchemy import event
from sqlalchemy.engine import Engine
import time
import logging

logging.basicConfig()
logger = logging.getLogger("myapp.sqltime")
logger.setLevel(logging.DEBUG)

@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, 
                        parameters, context, executemany):
    context._query_start_time = time.time()
    logger.debug("Start Query:\n%s" % statement)
    # Modification for *** answer:
    # Show parameters, which might be too verbose, depending on usage..
    logger.debug("Parameters:\n%r" % (parameters,))


@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, 
                        parameters, context, executemany):
    total = time.time() - context._query_start_time
    logger.debug("Query Complete!")

    # Modification for ***: times in milliseconds
    logger.debug("Total Time: %.02fms" % (total*1000))

if __name__ == '__main__':
    from sqlalchemy import *

    engine = create_engine('sqlite://')

    m1 = MetaData(engine)
    t1 = Table("sometable", m1, 
            Column("id", Integer, primary_key=True),
            Column("data", String(255), nullable=False),
        )

    conn = engine.connect()
    m1.create_all(conn)

    conn.execute(
        t1.insert(), 
        ["data":"entry %d" % x for x in xrange(100000)]
    )

    conn.execute(
        t1.select().where(t1.c.data.between("entry 25", "entry 7800")).order_by(desc(t1.c.data))
    )

输出类似于:

DEBUG:myapp.sqltime:Start Query:
SELECT sometable.id, sometable.data 
FROM sometable 
WHERE sometable.data BETWEEN ? AND ? ORDER BY sometable.data DESC
DEBUG:myapp.sqltime:Parameters:
('entry 25', 'entry 7800')
DEBUG:myapp.sqltime:Query Complete!
DEBUG:myapp.sqltime:Total Time: 410.46ms

然后,如果您发现一个奇怪的慢查询,您可以获取查询字符串,在参数中格式化(至少可以使用 % 字符串格式化运算符,对于 psycopg2),在其前面加上“EXPLAIN ANALYZE”和将查询计划输出推送到http://explain.depesz.com/(通过this good article on PostgreSQL performance 找到)

【讨论】:

我相信这将包括坐在回调队列(gevent/eventlet/etc)中的查询【参考方案4】:

有时只是简单的 SQL 日志记录(通过 python 的日志记录模块或通过create_engine() 上的echo=True 参数启用)可以让您了解事情需要多长时间。例如,如果您在 SQL 操作之后立即记录某些内容,您会在日志中看到类似这样的内容:

17:37:48,325 INFO  [sqlalchemy.engine.base.Engine.0x...048c] SELECT ...
17:37:48,326 INFO  [sqlalchemy.engine.base.Engine.0x...048c] <params>
17:37:48,660 DEBUG [myapp.somemessage] 

如果您在操作后立即登录myapp.somemessage,您知道完成 SQL 部分需要 334 毫秒。

记录 SQL 还将说明是否正在发出数十/数百个查询,这些查询可以通过连接更好地组织成更少的查询。使用 SQLAlchemy ORM 时,提供“急切加载”功能以部分 (contains_eager()) 或完全 (eagerload(), eagerload_all()) 自动执行此活动,但如果没有 ORM,它仅意味着使用连接,以便结果跨随着深度的增加,可以在一个结果集中加载多个表,而不是增加查询数量(即r + r*r2 + r*r2*r3 ...)

如果日志记录显示单个查询花费的时间过长,您需要详细说明在数据库中处理查询、通过网络发送结果、由 DBAPI 处理以及最终由SQLAlchemy 的结果集和/或 ORM 层。根据具体情况,每个阶段都可能出现各自的瓶颈。

为此,您需要使用分析,例如 cProfile 或 hotshot。这是我使用的装饰器:

import cProfile as profiler
import gc, pstats, time

def profile(fn):
    def wrapper(*args, **kw):
        elapsed, stat_loader, result = _profile("foo.txt", fn, *args, **kw)
        stats = stat_loader()
        stats.sort_stats('cumulative')
        stats.print_stats()
        # uncomment this to see who's calling what
        # stats.print_callers()
        return result
    return wrapper

def _profile(filename, fn, *args, **kw):
    load_stats = lambda: pstats.Stats(filename)
    gc.collect()

    began = time.time()
    profiler.runctx('result = fn(*args, **kw)', globals(), locals(),
                    filename=filename)
    ended = time.time()

    return ended - began, load_stats, locals()['result']

要分析一段代码,请将其放在带有装饰器的函数中:

@profile
def go():
    return Session.query(FooClass).filter(FooClass.somevalue==8).all()
myfoos = go()

分析的输出可用于了解时间花费在何处。例如,如果您看到所有时间都花在cursor.execute() 中,那就是对数据库的低级 DBAPI 调用,这意味着您的查询应该通过添加索引或重组查询和/或底层架构来优化。对于该任务,我建议使用 pgadmin 及其图形 EXPLAIN 实用程序来查看查询正在做什么。

如果您看到数以千计的与获取行相关的调用,这可能意味着您的查询返回的行数比预期的多 - 由于连接不完整而导致的笛卡尔积可能会导致此问题。另一个问题是花在类型处理上的时间 - 诸如 Unicode 之类的 SQLAlchemy 类型将对绑定参数和结果列执行字符串编码/解码,这可能并非在所有情况下都需要。

配置文件的输出可能有点令人生畏,但经过一些练习后,它们很容易阅读。曾经有人在邮件列表中声称速度很慢,在让他发布配置文件的结果后,我能够证明速度问题是由于网络延迟 - 在 cursor.execute() 以及所有 Python 中花费的时间方法非常快,而大部分时间都花在了 socket.receive() 上。

如果您有雄心壮志,那么在 SQLAlchemy 单元测试中还有一个更复杂的 SQLAlchemy 分析示例,如果您四处寻找 http://www.sqlalchemy.org/trac/browser/sqlalchemy/trunk/test/aaa_profiling。在那里,我们使用装饰器进行测试,这些装饰器断言用于特定操作的方法调用的最大数量,因此如果签入了一些低效的东西,测试将揭示它(重要的是要注意,在 Python 中,函数调用具有最高的任何操作的开销,并且调用次数通常与花费的时间几乎成正比)。值得注意的是“zoomark”测试使用了一种奇特的“SQL 捕获”方案,该方案从等式中减少了 DBAPI 的开销——尽管该技术对于普通的分析并不是真正必要的。

【讨论】:

如果您使用的是 Flask-SQLAlchemy,请将 SQLALCHEMY_ECHO = True 添加到您的应用配置中。【参考方案5】:

我在使用 cprofile 并在 runsnakerun 中查看结果方面取得了一些成功。这至少告诉我哪些函数和调用需要很长时间以及数据库是否是问题所在。 文档是here。你需要 wxpython。上面的presentation 可以帮助您入门。 就这么简单

import cProfile
command = """foo.run()"""
cProfile.runctx( command, globals(), locals(), filename="output.profile" )

然后

python runsnake.py output.profile

如果您希望优化查询,您需要postgrsql profiling。

登录记录查询也是值得的,但据我所知没有解析器来获取长时间运行的查询(它对并发请求没有用处)。

sqlhandler = logging.FileHandler("sql.log")
sqllogger = logging.getLogger('sqlalchemy.engine')
sqllogger.setLevel(logging.info)
sqllogger.addHandler(sqlhandler)

并确保您的创建引擎语句具有 echo = True。

当我这样做时,实际上主要问题是我的代码,所以 cprofile 的事情有所帮助。

【讨论】:

以上是关于如何分析 SQLAlchemy 支持的应用程序?的主要内容,如果未能解决你的问题,请参考以下文章

active_alchemy 不支持某些 SQLAlchemy 参数

如何使用 SQLAlchemy 创建 SQL 视图?

如何使用 SQLAlchemy 声明性语法指定关系?

在 SQLAlchemy 中以特定格式按日期分组

如何使用 SQLAlchemy 创建一个不是主键的标识列?

SQLAlchemy 是不是支持缓存?