SQLAlchemy 和多个进程的连接问题

Posted

技术标签:

【中文标题】SQLAlchemy 和多个进程的连接问题【英文标题】:Connection problems with SQLAlchemy and multiple processes 【发布时间】:2017-05-07 20:15:09 【问题描述】:

我在一个项目中使用 PostgreSQL 和 SQLAlchemy,该项目由一个启动子进程的主进程组成。所有这些进程都通过 SQLAlchemy 访问数据库。

我遇到了可重复的连接失败:前几个子进程正常工作,但一段时间后出现连接错误。这是一个 MWCE:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, create_engine
from sqlalchemy.orm import sessionmaker

DB_URL = 'postgresql://user:password@localhost/database'

Base = declarative_base()

class Dummy(Base):
    __tablename__ = 'dummies'
    id = Column(Integer, primary_key=True)
    value = Column(Integer)

engine = None
Session = None
session = None

def init():
    global engine, Session, session
    engine = create_engine(DB_URL)
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()

def cleanup():
    session.close()
    engine.dispose()

def target(id):
    init()
    try:
        dummy = session.query(Dummy).get(id)
        dummy.value += 1
        session.add(dummy)
        session.commit()
    finally:
        cleanup()

def main():
    init()
    try:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        p = multiprocessing.Process(target=target, args=(dummy.id,))
        p.start()
        p.join()
        session.refresh(dummy)
        assert dummy.value == 2
    finally:
        cleanup()

if __name__ == '__main__':
    i = 1
    while True:
        print(i)
        main()
        i += 1

在我的系统(PostgreSQL 9.6、SQLAlchemy 1.1.4、psycopg2 2.6.2、Python 2.7、Ubuntu 14.04)上,这会产生

1
2
3
4
5
6
7
8
9
10
11
Traceback (most recent call last):
  File "./fork_test.py", line 64, in <module>
    main()
  File "./fork_test.py", line 55, in main
    session.refresh(dummy)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1422, in refresh
    only_load_props=attribute_names) is None:
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident
    return q.one()
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2756, in one
    ret = self.one_or_none()
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2726, in one_or_none
    ret = list(self)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2797, in __iter__
    return self._execute_and_instances(context)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2820, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 945, in execute
    return meth(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception
    exc_info
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 469, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
 [SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: 'param_1': 11074]

这是可重复的,并且总是在同一迭代中崩溃。

我正在按照SQLAlchemy documentation 和elsewhere 的建议在分叉之后创建一个新引擎和会话。有趣的是,以下略有不同的方法不会崩溃:

import contextlib
import multiprocessing

import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, create_engine
from sqlalchemy.orm import sessionmaker

DB_URL = 'postgresql://user:password@localhost/database'

Base = declarative_base()

class Dummy(Base):
    __tablename__ = 'dummies'
    id = Column(Integer, primary_key=True)
    value = Column(Integer)

@contextlib.contextmanager
def get_session():
    engine = sqlalchemy.create_engine(DB_URL)
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    try:
        yield session
    finally:
        session.close()
        engine.dispose()

def target(id):
    with get_session() as session:
        dummy = session.query(Dummy).get(id)
        dummy.value += 1
        session.add(dummy)
        session.commit()

def main():
    with get_session() as session:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        p = multiprocessing.Process(target=target, args=(dummy.id,))
        p.start()
        p.join()
        session.refresh(dummy)
        assert dummy.value == 2

if __name__ == '__main__':
    i = 1
    while True:
        print(i)
        main()
        i += 1

由于原始代码更复杂,不能简单地切换到后一个版本,我想了解为什么其中一个有效而另一个无效。

唯一明显的区别是崩溃代码使用引擎和会话的全局变量——这些是通过写时复制与子进程共享的。但是,由于我在分叉后直接重置它们,我不明白这怎么可能是个问题。

更新

我使用 Python 2.7 和 Python 3.4 使用最新的 SQLAlchemy (1.1.5) 重新运行了这两个代码段。两者的结果基本如前所述。然而,在 Python 2.7 上,第一个代码片段的崩溃现在发生在第 13 次迭代中(可重现),而在 3.4 上,它已经发生在第三次迭代中(也可重现)。第二个代码段在两个版本上运行都没有问题。这是 3.4 的回溯:

1
2
3
Traceback (most recent call last):
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
    cursor.execute(statement, parameters)
psycopg2.OperationalError: server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "fork_test.py", line 64, in <module>
    main()
  File "fork_test.py", line 55, in main
    session.refresh(dummy)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 1424, in refresh
    only_load_props=attribute_names) is None:
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident
    return q.one()
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2749, in one
    ret = self.one_or_none()
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2719, in one_or_none
    ret = list(self)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2790, in __iter__
    return self._execute_and_instances(context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2813, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 945, in execute
    return meth(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception
    exc_info
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
    raise value.with_traceback(tb)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
 [SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: 'param_1': 3397]

这是 PostgreSQL 日志(2.7 和 3.4 相同):

2017-01-18 10:59:36 UTC [22429-1] LOG:  database system was shut down at 2017-01-18 10:59:35 UTC
2017-01-18 10:59:36 UTC [22429-2] LOG:  MultiXact member wraparound protections are now enabled
2017-01-18 10:59:36 UTC [22428-1] LOG:  database system is ready to accept connections
2017-01-18 10:59:36 UTC [22433-1] LOG:  autovacuum launcher started
2017-01-18 10:59:36 UTC [22435-1] [unknown]@[unknown] LOG:  incomplete startup packet
2017-01-18 11:00:10 UTC [22466-1] user@db LOG:  SSL error: decryption failed or bad record mac
2017-01-18 11:00:10 UTC [22466-2] user@db LOG:  could not receive data from client: Connection reset by peer

(注意关于启动包不完整的消息is harmless)

【问题讨论】:

您使用的是哪个版本的 python? @ThomasMoreau:“PostgreSQL 9.6、SQLAlchemy 1.1.4、psycopg2 2.6.2、Python 2.7、Ubuntu 14.04”;) 您是否使用与示例中显示的相同的连接参数?您的真实代码中是否有自定义连接池实现? 它在我的系统上运行得很好......你有来自你的 postgresql 服务器的日志吗?它在 python3 中也会崩溃吗? 宾果游戏:sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: decryption failed or bad record mac. 【参考方案1】:

引用 "How do I use engines / connections / sessions with Python multiprocessing, or os.fork()?" 并强调:

SQLAlchemy Engine 对象指的是现有数据库连接的连接池。所以当这个对象被复制到子进程时,目标是确保没有数据库连接被继承

但是,对于共享事务活动会话或连接的情况,没有自动修复此问题;应用程序需要确保新的子进程只启动新的 Connection 对象和事务,以及 ORM Session 对象。

问题源于继承了实时全局session 的分叉子进程,该进程保留了Connection。当target 调用init 时,它会覆盖对enginesession 的全局引用,从而将它们在子进程中的引用计数减少到0,迫使它们完成。例如,如果您以一种或另一种方式在子项中创建对继承会话的另一个引用,您会阻止它被清理——但不要这样做。在main 加入并照常恢复业务后,它正在尝试使用现在可能已完成或不同步的连接。至于为什么这仅在经过一定数量的迭代后才会导致错误,我不确定。

使用全局变量处理这种情况的唯一方法是

    关闭所有会话 致电engine.dispose()

在分叉之前。这将防止连接泄漏给孩子。例如:

def main():
    global session
    init()
    try:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        dummy_id = dummy.id
        # Return the Connection to the pool
        session.close()
        # Dispose of it!
        engine.dispose()
        # ...or call your cleanup() function, which does the same
        p = multiprocessing.Process(target=target, args=(dummy_id,))
        p.start()
        p.join()
        # Start a new session
        session = Session()
        dummy = session.query(Dummy).get(dummy_id)
        assert dummy.value == 2
    finally:
        cleanup()

您的第二个示例不会触发子节点的最终确定,因此它似乎只能工作,尽管它可能与第一个示例一样损坏,因为它仍在继承会话的副本及其在 @987654333 中本地定义的连接@。

【讨论】:

不应该scoped_session 在这里帮忙吗? docs.sqlalchemy.org/en/latest/orm/contextual.html @lucas03 不,作用域会话是一个线程本地会话注册表。 哦,对了,我们实际上在 gunicorn post_fork 和 celery on_worker_process_init 中调用 engine.dispose 在某些情况下,我发现有必要在dispose()之后立即调用gc.collect();否则,分叉的进程可能会尝试收集与引擎相关的死垃圾,导致上面报告的 OperationalError 相同。 @Lucas03 @Ilja Everilä 我们不能像这样str(os.getpid()) + str(threading.local().__hash__()) 使用scoped_sessionscopefunc 吗? docs.sqlalchemy.org/en/13/orm/… 是否存在这种情况会破坏单进程、多线程应用程序?

以上是关于SQLAlchemy 和多个进程的连接问题的主要内容,如果未能解决你的问题,请参考以下文章

flask-sqlalchemy 外连接的 Flask-marshmallow 转储返回空

flask mysql sqlalchemy教程

如何使用 sqlalchemy 关系实现多个连接

“SQLAlchemy”类中未解析的属性“Column”

sqlalchemy 映射的小例子

我从 sqlalchemy 得到一个“幽灵”回滚,但在使用 psql 和 postgres 时没有