SQLAlchemy 和多个进程的连接问题
Posted
技术标签:
【中文标题】SQLAlchemy 和多个进程的连接问题【英文标题】:Connection problems with SQLAlchemy and multiple processes 【发布时间】:2017-05-07 20:15:09 【问题描述】:我在一个项目中使用 PostgreSQL 和 SQLAlchemy,该项目由一个启动子进程的主进程组成。所有这些进程都通过 SQLAlchemy 访问数据库。
我遇到了可重复的连接失败:前几个子进程正常工作,但一段时间后出现连接错误。这是一个 MWCE:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, create_engine
from sqlalchemy.orm import sessionmaker
DB_URL = 'postgresql://user:password@localhost/database'
Base = declarative_base()
class Dummy(Base):
__tablename__ = 'dummies'
id = Column(Integer, primary_key=True)
value = Column(Integer)
engine = None
Session = None
session = None
def init():
global engine, Session, session
engine = create_engine(DB_URL)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
def cleanup():
session.close()
engine.dispose()
def target(id):
init()
try:
dummy = session.query(Dummy).get(id)
dummy.value += 1
session.add(dummy)
session.commit()
finally:
cleanup()
def main():
init()
try:
dummy = Dummy(value=1)
session.add(dummy)
session.commit()
p = multiprocessing.Process(target=target, args=(dummy.id,))
p.start()
p.join()
session.refresh(dummy)
assert dummy.value == 2
finally:
cleanup()
if __name__ == '__main__':
i = 1
while True:
print(i)
main()
i += 1
在我的系统(PostgreSQL 9.6、SQLAlchemy 1.1.4、psycopg2 2.6.2、Python 2.7、Ubuntu 14.04)上,这会产生
1
2
3
4
5
6
7
8
9
10
11
Traceback (most recent call last):
File "./fork_test.py", line 64, in <module>
main()
File "./fork_test.py", line 55, in main
session.refresh(dummy)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1422, in refresh
only_load_props=attribute_names) is None:
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident
return q.one()
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2756, in one
ret = self.one_or_none()
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2726, in one_or_none
ret = list(self)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2797, in __iter__
return self._execute_and_instances(context)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2820, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 945, in execute
return meth(self, multiparams, params)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
compiled_sql, distilled_params
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
context)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception
exc_info
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
context)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 469, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
[SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: 'param_1': 11074]
这是可重复的,并且总是在同一迭代中崩溃。
我正在按照SQLAlchemy documentation 和elsewhere 的建议在分叉之后创建一个新引擎和会话。有趣的是,以下略有不同的方法不会崩溃:
import contextlib
import multiprocessing
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, create_engine
from sqlalchemy.orm import sessionmaker
DB_URL = 'postgresql://user:password@localhost/database'
Base = declarative_base()
class Dummy(Base):
__tablename__ = 'dummies'
id = Column(Integer, primary_key=True)
value = Column(Integer)
@contextlib.contextmanager
def get_session():
engine = sqlalchemy.create_engine(DB_URL)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
try:
yield session
finally:
session.close()
engine.dispose()
def target(id):
with get_session() as session:
dummy = session.query(Dummy).get(id)
dummy.value += 1
session.add(dummy)
session.commit()
def main():
with get_session() as session:
dummy = Dummy(value=1)
session.add(dummy)
session.commit()
p = multiprocessing.Process(target=target, args=(dummy.id,))
p.start()
p.join()
session.refresh(dummy)
assert dummy.value == 2
if __name__ == '__main__':
i = 1
while True:
print(i)
main()
i += 1
由于原始代码更复杂,不能简单地切换到后一个版本,我想了解为什么其中一个有效而另一个无效。
唯一明显的区别是崩溃代码使用引擎和会话的全局变量——这些是通过写时复制与子进程共享的。但是,由于我在分叉后直接重置它们,我不明白这怎么可能是个问题。
更新
我使用 Python 2.7 和 Python 3.4 使用最新的 SQLAlchemy (1.1.5) 重新运行了这两个代码段。两者的结果基本如前所述。然而,在 Python 2.7 上,第一个代码片段的崩溃现在发生在第 13 次迭代中(可重现),而在 3.4 上,它已经发生在第三次迭代中(也可重现)。第二个代码段在两个版本上运行都没有问题。这是 3.4 的回溯:
1
2
3
Traceback (most recent call last):
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
context)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
cursor.execute(statement, parameters)
psycopg2.OperationalError: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "fork_test.py", line 64, in <module>
main()
File "fork_test.py", line 55, in main
session.refresh(dummy)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 1424, in refresh
only_load_props=attribute_names) is None:
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident
return q.one()
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2749, in one
ret = self.one_or_none()
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2719, in one_or_none
ret = list(self)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2790, in __iter__
return self._execute_and_instances(context)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2813, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 945, in execute
return meth(self, multiparams, params)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
compiled_sql, distilled_params
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
context)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception
exc_info
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
raise value.with_traceback(tb)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
context)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
[SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: 'param_1': 3397]
这是 PostgreSQL 日志(2.7 和 3.4 相同):
2017-01-18 10:59:36 UTC [22429-1] LOG: database system was shut down at 2017-01-18 10:59:35 UTC
2017-01-18 10:59:36 UTC [22429-2] LOG: MultiXact member wraparound protections are now enabled
2017-01-18 10:59:36 UTC [22428-1] LOG: database system is ready to accept connections
2017-01-18 10:59:36 UTC [22433-1] LOG: autovacuum launcher started
2017-01-18 10:59:36 UTC [22435-1] [unknown]@[unknown] LOG: incomplete startup packet
2017-01-18 11:00:10 UTC [22466-1] user@db LOG: SSL error: decryption failed or bad record mac
2017-01-18 11:00:10 UTC [22466-2] user@db LOG: could not receive data from client: Connection reset by peer
(注意关于启动包不完整的消息is harmless)
【问题讨论】:
您使用的是哪个版本的 python? @ThomasMoreau:“PostgreSQL 9.6、SQLAlchemy 1.1.4、psycopg2 2.6.2、Python 2.7、Ubuntu 14.04”;) 您是否使用与示例中显示的相同的连接参数?您的真实代码中是否有自定义连接池实现? 它在我的系统上运行得很好......你有来自你的 postgresql 服务器的日志吗?它在 python3 中也会崩溃吗? 宾果游戏:sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: decryption failed or bad record mac
.
【参考方案1】:
引用 "How do I use engines / connections / sessions with Python multiprocessing, or os.fork()?" 并强调:
SQLAlchemy Engine 对象指的是现有数据库连接的连接池。所以当这个对象被复制到子进程时,目标是确保没有数据库连接被继承。
和
但是,对于共享事务活动会话或连接的情况,没有自动修复此问题;应用程序需要确保新的子进程只启动新的 Connection 对象和事务,以及 ORM Session 对象。
问题源于继承了实时全局session
的分叉子进程,该进程保留了Connection
。当target
调用init
时,它会覆盖对engine
和session
的全局引用,从而将它们在子进程中的引用计数减少到0,迫使它们完成。例如,如果您以一种或另一种方式在子项中创建对继承会话的另一个引用,您会阻止它被清理——但不要这样做。在main
加入并照常恢复业务后,它正在尝试使用现在可能已完成或不同步的连接。至于为什么这仅在经过一定数量的迭代后才会导致错误,我不确定。
使用全局变量处理这种情况的唯一方法是
-
关闭所有会话
致电
engine.dispose()
在分叉之前。这将防止连接泄漏给孩子。例如:
def main():
global session
init()
try:
dummy = Dummy(value=1)
session.add(dummy)
session.commit()
dummy_id = dummy.id
# Return the Connection to the pool
session.close()
# Dispose of it!
engine.dispose()
# ...or call your cleanup() function, which does the same
p = multiprocessing.Process(target=target, args=(dummy_id,))
p.start()
p.join()
# Start a new session
session = Session()
dummy = session.query(Dummy).get(dummy_id)
assert dummy.value == 2
finally:
cleanup()
您的第二个示例不会触发子节点的最终确定,因此它似乎只能工作,尽管它可能与第一个示例一样损坏,因为它仍在继承会话的副本及其在 @987654333 中本地定义的连接@。
【讨论】:
不应该scoped_session
在这里帮忙吗? docs.sqlalchemy.org/en/latest/orm/contextual.html
@lucas03 不,作用域会话是一个线程本地会话注册表。
哦,对了,我们实际上在 gunicorn post_fork
和 celery on_worker_process_init
中调用 engine.dispose
。
在某些情况下,我发现有必要在dispose()
之后立即调用gc.collect()
;否则,分叉的进程可能会尝试收集与引擎相关的死垃圾,导致上面报告的 OperationalError
相同。
@Lucas03 @Ilja Everilä 我们不能像这样str(os.getpid()) + str(threading.local().__hash__())
使用scoped_session
和scopefunc
吗? docs.sqlalchemy.org/en/13/orm/… 是否存在这种情况会破坏单进程、多线程应用程序?以上是关于SQLAlchemy 和多个进程的连接问题的主要内容,如果未能解决你的问题,请参考以下文章