中断键时使用 psycopg2 终止未完成的 SQL 提交查询
Posted
技术标签:
【中文标题】中断键时使用 psycopg2 终止未完成的 SQL 提交查询【英文标题】:Kill unfinished SQL commited query with psycopg2 when InterruptionKey 【发布时间】:2020-01-13 12:22:28 【问题描述】:我使用 Python 脚本,该脚本使用 psycopg2 库在 AWS Redshift 数据库上运行各种 SQL 自动提交的查询。该脚本是从我的本地工作站手动执行的。流程如下:
使用 psycopg2.connect() 创建数据库连接 使用 execute() 对数据库执行自动提交的查询 关闭连接。由于各种原因,数据库可能不可用(网络问题,许多查询已经在运行...),最好停止 Python 脚本。此时,我通过 SQL 客户端(SQL 工作台)通过检索与这些查询关联的 pid 来终止已提交(和未完成)的查询。当用户停止它时,我想直接在 Python 脚本中自动执行最后一步(ctrl+c)。流程是:
使用 psycopg2.connect() 创建数据库连接 使用 execute() 对数据库执行自动提交的查询 使用 info.backend_pid 连接属性存储与查询关联的当前 PID 如果收到 InterruptionKey 异常,则使用先前存储的 PID 终止正在运行的查询 关闭连接。我在 Notebook 上做了一些测试以检查是否可以检索 back_pid 信息:
log = logging.getLogger(__name__)
session = psycopg2.connect(
connection_factory=LoggingConnection,
host=host,
port=port,
dbname=database,
user=user,
password=password,
sslmode="require",
)
session.initialize(log)
session.set_session(autocommit=True)
query = """
CREATE OR REPLACE FUNCTION janky_sleep (x float) RETURNS bool IMMUTABLE as $$
from time import sleep
sleep(x)
return True
$$ LANGUAGE plpythonu;
"""
cur = session.cursor()
cur.execute(query)
cur.execute("select janky_sleep(60.0)")
我使用 sleep 函数来复制需要 60 秒才能完成的查询的行为。 获取backend_pid时如下:
session.info.backend_pid
问题是会话对象已被 execute() 方法(运行查询)使用,并且 backend_pid 信息仅在会话空闲时产生,即查询完成时。
我想过旋转一个并发 Python 进程来监控父进程。一旦父进程停止,子进程将通过第二个数据库连接获取 backend_pid,然后运行 kill 查询。然而,这种方法似乎有点矫枉过正。
处理这种情况的正确方法是什么?
谢谢
【问题讨论】:
【参考方案1】:我终于使用了从文档中找到的资源: http://initd.org/psycopg/docs/faq.html#faq-interrupt-query。它使 Psycopg2 能够获取 SIGINT 信号并终止后续查询。
>>> psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
>>> cnn = psycopg2.connect('')
>>> cur = cnn.cursor()
>>> cur.execute("select pg_sleep(10)")
^C
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
QueryCanceledError: canceling statement due to user request
>>> cnn.rollback()
>>> # You can use the connection and cursor again from here
【讨论】:
以上是关于中断键时使用 psycopg2 终止未完成的 SQL 提交查询的主要内容,如果未能解决你的问题,请参考以下文章