如何处理 ProcessPool 中的 SQLAlchemy 连接?
Posted
技术标签:
【中文标题】如何处理 ProcessPool 中的 SQLAlchemy 连接?【英文标题】:How to handle SQLAlchemy Connections in ProcessPool? 【发布时间】:2017-01-29 12:32:15 【问题描述】:我有一个反应器,它从 RabbitMQ 代理获取消息并触发工作方法以在进程池中处理这些消息,如下所示:
这是使用 python asyncio
、loop.run_in_executor()
和 concurrent.futures.ProcessPoolExecutor
实现的。
现在我想使用 SQLAlchemy 在工作方法中访问数据库。大多数情况下,处理将是非常简单和快速的 CRUD 操作。
reactor 开始时每秒会处理 10-50 条消息,因此不能为每个请求都打开一个新的数据库连接。相反,我想为每个进程维护一个持久连接。
我的问题是:我该怎么做?我可以将它们存储在全局变量中吗? SQA 连接池会为我处理这个问题吗?反应堆停止时如何清理?
[更新]
数据库是带有 InnoDB 的 mysql。为什么选择这种带有进程池的模式?
当前实现使用不同的模式,每个消费者在自己的线程中运行。不知何故,这不是很好。已经有大约 200 个消费者在各自的线程中运行,并且系统正在快速增长。为了更好地扩展,想法是分离关注点并在 I/O 循环中使用消息并将处理委托给池。当然,整个系统的性能主要受 I/O 限制。但是,在处理大型结果集时,CPU 是一个问题。
另一个原因是“易于使用”。虽然消息的连接处理和消费是异步实现的,但worker中的代码可以是同步的和简单的。
很快就发现,通过工作人员内部的持久网络连接访问远程系统是一个问题。这就是 CommunicationChannels 的用途:在 worker 内部,我可以通过这些通道向消息总线授予请求。
我目前的一个想法是以类似的方式处理数据库访问:将语句通过队列传递到事件循环,然后将它们发送到数据库。但是,我不知道如何使用 SQLAlchemy 做到这一点。
切入点在哪里?
对象在通过队列时需要为pickled
。如何从 SQA 查询中获取这样的对象?
与数据库的通信必须异步进行,以免阻塞事件循环。我可以使用例如aiomysql 作为 SQA 的数据库驱动程序?
【问题讨论】:
那么每个worker都是自己的进程?那时不能共享连接,所以也许你应该实例化每个(本地)SQA 池,最大 1 或 2 个连接限制。然后观察,也许通过数据库(哪个数据库?)正在产生/杀死哪些连接。在这件事上已经被严重烧毁了 - 你不想想要做的是在 SQA 之上实现你自己的幼稚 conn 池。或者尝试确定 SQA 连接是否已关闭。 @JLPeyret:我用您要求的信息更新了问题。不...我不打算实现自己的连接池。 所以,我想我记得连接不能跨进程(在操作系统的意义上,与线程区分开来)。而且我知道连接根本不能很好地腌制。您应该能够向“死”(字符串)sql 语句发送消息,但我相信您将很难传递 db conns,我认为可能包括 SQA 结果。我的猜测,但在一定程度上使用奇怪的 SQA 来证明它的合理性。 【参考方案1】:如果您在如何实例化 session
时注意一下,假设您在工作进程中使用 orm,那么您对每个进程池进程一个数据库连接 的要求很容易满足进程。
一个简单的解决方案是拥有一个全局session,您可以在请求之间重复使用它:
# db.py
engine = create_engine("connection_uri", pool_size=1, max_overflow=0)
DBSession = scoped_session(sessionmaker(bind=engine))
关于工人任务:
# task.py
from db import engine, DBSession
def task():
DBSession.begin() # each task will get its own transaction over the global connection
...
DBSession.query(...)
...
DBSession.close() # cleanup on task end
参数pool_size
和max_overflow
customize create_engine 使用的默认QueuePool。pool_size
将确保您的进程仅在进程池中为每个进程保持 1 个连接。
如果您希望它重新连接,您可以使用DBSession.remove()
,它将从注册表中删除会话,并使其在下次使用 DBSession 时重新连接。您还可以使用Pool 的recycle
参数使连接在指定时间后重新连接。
在开发/调试期间,您可以使用AssertionPool,如果从池中检出多个连接,则会引发异常,请参阅switching pool implementations 了解如何执行此操作。
【讨论】:
所以您基本上建议我不要担心,因为 SQA 池会立即处理该问题?这会很好!我将在接下来的几天内将我们拥有 +200 个消费者和 +20000 行代码的主应用程序迁移到新的软件架构中,看看它是否有效。 @roman 祝您重构顺利,如果您有任何问题,请随时在此处发表评论,如果您认为我涵盖了您的问题,最好将其标记为已接受: ) . 到目前为止似乎工作正常! :) 我认为应该提到文档中的这一部分 docs.sqlalchemy.org/en/rel_1_1/core/…。必须特别注意多处理。【参考方案2】:对我非常有用的一种方法是使用网络服务器来处理和扩展进程池。即使在默认状态下,flask-sqlalchemy 也会保留一个连接池,并且不会在每个请求响应周期中关闭每个连接。
asyncio 执行器可以调用 url 端点来执行你的函数。额外的好处是,因为所有执行工作的进程都在一个 url 后面,所以您可以在多台机器上轻松扩展工作池,通过 gunicorn 或其他许多方法之一添加更多进程来扩展简单的 wsgi 服务器。另外,您还可以获得所有容错功能。
缺点是您可能会通过网络传递更多信息。但是,正如您所说,问题是 CPU 受限,您可能会在数据库中传入和传出更多数据。
【讨论】:
当我说 CPU 是一个问题时,我并不是说主要工作负载受 CPU 限制!不是......与上面的其他方法一样,我在这里看到了事务处理的严重问题。在业务逻辑和持久层之间建立无状态网络连接听起来很可怕。【参考方案3】:@roman:你有很好的挑战。
我之前也遇到过类似的情况,所以这是我的 2 美分:除非这个消费者只有 “read” 和 “write”消息,无需对其进行任何实际处理,您可以重新设计此消费者作为将消费消息的消费者/生产者,它将处理消息,然后会将结果放入另一个队列,该队列(例如处理过的消息)可以被 1..N 个非池化异步进程读取,这些进程将在其自己的整个生命周期中打开数据库连接。
我可以扩展我的答案,但我不知道这种方法是否适合您的需求,如果适合,我可以为您提供有关扩展设计的更多详细信息。
【讨论】:
我正在考虑这种方法,但我认为很难正确处理事务。我想我不想尝试构建自己的分布式事务管理器。以上是关于如何处理 ProcessPool 中的 SQLAlchemy 连接?的主要内容,如果未能解决你的问题,请参考以下文章