sqlalchemy:停止长时间运行的查询
Posted
技术标签:
【中文标题】sqlalchemy:停止长时间运行的查询【英文标题】:sqlalchemy: stopping a long-running query 【发布时间】:2012-03-15 07:14:08 【问题描述】:我有一个看似直截了当的情况,但找不到直截了当的解决方案。
我正在使用 sqlalchemy 来查询 postgres。如果发生客户端超时,我想停止/取消来自另一个线程的长时间运行的 postgres 查询。线程有权访问 Session 或 Connection 对象。
此时我已经尝试过:
session.bind.raw_connection().close()
和
session.connection().close()
和
session.close
和
session.transaction.close()
但无论我尝试什么,postgres 查询仍然会继续直到结束。我从顶部的 pg 中知道这一点。这不应该很容易做到吗?我错过了什么?如果不获取pid并直接发送停止信号,这是不可能的吗?
【问题讨论】:
即使你完全杀死你的客户端进程,数据库也可能需要很长时间来回滚它的查询,这取决于它是什么以及表中有多少数据。努力优化您的查询,而不是试图弄清楚如何中止它们。 是的,优化是最重要的,但由于我们项目的性质,超时将会发生,我们希望确保覆盖所有基础。感谢您的评论。 访问原始套接字并关闭它的方法将隐藏在 psycopg2 驱动程序的某个位置。您必须查看它的来源才能找到它。为 sqlalchemy 层的无法解释的行为和严重崩溃做好充分准备,从您实际强制关闭其中一个套接字的那一刻开始。 感谢 wberry,在您的帮助下我找到了可行的解决方案。 【参考方案1】:到目前为止,这似乎运作良好:
def test_close_connection(self):
import threading
from psycopg2.extensions import QueryCanceledError
from sqlalchemy.exc import DBAPIError
session = Session()
conn = session.connection()
sql = self.get_raw_sql_for_long_query()
seconds = 5
t = threading.Timer(seconds, conn.connection.cancel)
t.start()
try:
conn.execute(sql)
except DBAPIError, e:
if type(e.orig) == QueryCanceledError:
print 'Long running query was cancelled.'
t.cancel()
source
【讨论】:
【参考方案2】:对于那些可能在这里结束的 mysql 人员,this 答案的修改版本可以从第二个连接中终止查询。本质上如下,假设 pymysql 在引擎盖下:
thread_id = conn1.connection.thread_id()
t = threading.Timer(seconds, lambda: conn2.execute("kill ".format(thread_id)))
原来的连接会引发 pymysql.err.OperationalError。请参阅 this 其他答案,了解创建长时间运行查询以进行测试的简洁方法。
【讨论】:
【参考方案3】:在MYSQL上发现可以指定查询优化器提示。
其中一个提示是MAX_EXECUTION_TIME
,用于指定查询在终止前应执行多长时间。
你可以在你的 app.py 中添加这个
@event.listens_for(engine, 'before_execute', retval=True)
def intercept(conn, clauseelement, multiparams, params):
from sqlalchemy.sql.selectable import Select
# check if it's select statement
if isinstance(clauseelement, Select):
# 'froms' represents list of tables that statement is querying
table = clauseelement.froms[0]
'''Update the timeout here in ms (1s = 1000ms)'''
timeout_ms = 4000
# adding filter in clause
clauseelement = clauseelement.prefix_with(f"/*+ MAX_EXECUTION_TIME(timeout_ms) */", dialect="mysql")
return clauseelement, multiparams, params
SQLAlchemy query API not working correctly with hints 和 MYSQL reference
【讨论】:
以上是关于sqlalchemy:停止长时间运行的查询的主要内容,如果未能解决你的问题,请参考以下文章