sqlalchemy:停止长时间运行的查询

Posted

技术标签:

【中文标题】sqlalchemy:停止长时间运行的查询【英文标题】:sqlalchemy: stopping a long-running query 【发布时间】:2012-03-15 07:14:08 【问题描述】:

我有一个看似直截了当的情况,但找不到直截了当的解决方案。

我正在使用 sqlalchemy 来查询 postgres。如果发生客户端超时,我想停止/取消来自另一个线程的长时间运行的 postgres 查询。线程有权访问 Session 或 Connection 对象。

此时我已经尝试过:

session.bind.raw_connection().close()

session.connection().close()

session.close

session.transaction.close()

但无论我尝试什么,postgres 查询仍然会继续直到结束。我从顶部的 pg 中知道这一点。这不应该很容易做到吗?我错过了什么?如果不获取pid并直接发送停止信号,这是不可能的吗?

【问题讨论】:

即使你完全杀死你的客户端进程,数据库也可能需要很长时间来回滚它的查询,这取决于它是什么以及表中有多少数据。努力优化您的查询,而不是试图弄清楚如何中止它们。 是的,优化是最重要的,但由于我们项目的性质,超时将会发生,我们希望确保覆盖所有基础。感谢您的评论。 访问原始套接字并关闭它的方法将隐藏在 psycopg2 驱动程序的某个位置。您必须查看它的来源才能找到它。为 sqlalchemy 层的无法解释的行为和严重崩溃做好充分准备,从您实际强制关闭其中一个套接字的那一刻开始。 感谢 wberry,在您的帮助下我找到了可行的解决方案。 【参考方案1】:

到目前为止,这似乎运作良好:

def test_close_connection(self):
    import threading
    from psycopg2.extensions import QueryCanceledError
    from sqlalchemy.exc import DBAPIError

    session = Session()
    conn = session.connection()
    sql = self.get_raw_sql_for_long_query()

    seconds = 5
    t = threading.Timer(seconds, conn.connection.cancel)
    t.start()

    try:
        conn.execute(sql)
    except DBAPIError, e:
        if type(e.orig) == QueryCanceledError:
            print 'Long running query was cancelled.'
    t.cancel()

source

【讨论】:

【参考方案2】:

对于那些可能在这里结束的 mysql 人员,this 答案的修改版本可以从第二个连接中终止查询。本质上如下,假设 pymysql 在引擎盖下:

thread_id = conn1.connection.thread_id()
t = threading.Timer(seconds, lambda: conn2.execute("kill ".format(thread_id)))

原来的连接会引发 pymysql.err.OperationalError。请参阅 this 其他答案,了解创建长时间运行查询以进行测试的简洁方法。

【讨论】:

【参考方案3】:

在MYSQL上发现可以指定查询优化器提示。 其中一个提示是MAX_EXECUTION_TIME,用于指定查询在终止前应执行多长时间。

你可以在你的 app.py 中添加这个

@event.listens_for(engine, 'before_execute', retval=True)
def intercept(conn, clauseelement, multiparams, params):
  from sqlalchemy.sql.selectable import Select

  # check if it's select statement
  if isinstance(clauseelement, Select):
    # 'froms' represents list of tables that statement is querying
    table = clauseelement.froms[0]
    '''Update the timeout here in ms (1s = 1000ms)'''
    timeout_ms = 4000
    # adding filter in clause
    clauseelement = clauseelement.prefix_with(f"/*+ MAX_EXECUTION_TIME(timeout_ms) */", dialect="mysql")

  return clauseelement, multiparams, params

SQLAlchemy query API not working correctly with hints 和 MYSQL reference

【讨论】:

以上是关于sqlalchemy:停止长时间运行的查询的主要内容,如果未能解决你的问题,请参考以下文章

在 JDBC 中停止或终止长时间运行的查询

如果需要太长时间,如何停止 MySQL 查询?

在 qthread 中停止长时间运行的进程

长时间运行的 py.test 在第一次失败时停止

ANT / JMeter - 长时间运行的测试 - 停止写入摘要结果

JVM如何通知长时间运行的线程“停止”[重复]