Celery Worker 数据库连接池
Posted
技术标签:
【中文标题】Celery Worker 数据库连接池【英文标题】:Celery Worker Database Connection Pooling 【发布时间】:2013-01-09 16:17:58 【问题描述】:我正在使用 Celery 独立(不在 Django 中)。我计划在多台物理机器上运行一种工作任务类型。该任务执行以下操作
-
接受 XML 文档。
转换它。
进行多次数据库读取和写入。
我使用的是 PostgreSQL,但这同样适用于使用连接的其他存储类型。过去,我使用数据库连接池来避免在每个请求上创建新的数据库连接或避免连接打开时间过长。但是,由于每个 Celery 工作人员都在一个单独的进程中运行,我不确定他们实际上如何能够共享池。我错过了什么吗?我知道 Celery 允许您保留从 Celery 工作人员返回的结果,但这不是我在这里想要做的。根据处理的数据,每个任务可以执行多个不同的更新或插入。
从 Celery worker 中访问数据库的正确方法是什么?
是否可以在多个工作人员/任务之间共享一个池,或者是否有其他方法可以做到这一点?
【问题讨论】:
你解决了吗?我会对解决方案感兴趣。 我为每个工作人员设置了一个数据库连接。 @oneself 如果你接受答案就好了 嗨,您是如何为每个工作人员获得一个数据库连接的。我会对解决方案感兴趣 【参考方案1】:我喜欢tigeronk2 的每个worker 一个连接的想法。正如他所说,Celery 维护自己的工作池,因此实际上不需要单独的数据库连接池。 Celery Signal docs 解释了如何在创建 worker 时进行自定义初始化,因此我将以下代码添加到我的 tasks.py 中,它似乎完全按照您的预期工作。当工作人员关闭时,我什至能够关闭连接:
from celery.signals import worker_process_init, worker_process_shutdown
db_conn = None
@worker_process_init.connect
def init_worker(**kwargs):
global db_conn
print('Initializing database connection for worker.')
db_conn = db.connect(DB_CONNECT_STRING)
@worker_process_shutdown.connect
def shutdown_worker(**kwargs):
global db_conn
if db_conn:
print('Closing database connectionn for worker.')
db_conn.close()
【讨论】:
我对这个有点困惑。这不会在模块级别创建一个连接,所以对于每个celery worker
?如果您从任务中访问全局 db_conn
,您将在多个进程或线程中重用相同的连接(取决于您的并发设置。)。它将启动许多连接,但只是覆盖与启动的每个新工作人员的连接。我在这里错过了什么吗?
是的,这就是重点:每个工作人员一个连接。这是一个工作进程池的解决方案,在这种情况下,全局 db_conn 仅在进程上下文中是全局的,因此一切都很好。已经有一段时间了,所以我什至不记得我是否费心用线程来测试它。
@ThatAintWorking 这对我来说很好用 pyodbc。
它的缺点是,如果你有固定数量的工人,空闲工人的数据库连接可能会变得陈旧和无效。【参考方案2】:
通过实施和监控来回馈我的发现。
欢迎反馈。
参考: 使用池化http://www.prschmid.com/2013/04/using-sqlalchemy-with-celery-tasks.html
每个工作进程(由 -c k 指定的 prefork 模式)将建立一个与 DB 的新连接,而无需池化或重用。 因此,如果使用池化,则只能在每个工作进程级别看到该池。所以池大小 > 1 没有用,但重用连接仍然可以保存打开和关闭的连接。
如果每个工作进程使用一个连接,则在初始化阶段为每个工作进程建立 1 个 DB 连接(prefork 模式 celery -A app worker -c k)。 它可以重复打开和关闭连接。
无论有多少工作线程(eventlet),每个工作线程(celery -A app worker -P eventlet)只建立一个与DB的连接,不进行池化或复用。 因此对于 eventlet,一个 celery 进程(celery -A app worker ...)上的所有工作线程(eventlet)在每个时刻都有 1 个 db 连接。
根据 celery 文档
但您需要确保您的任务不会执行阻塞调用,因为 这将停止工作人员中的所有其他操作,直到阻塞 调用返回。
可能是由于mysql DB连接方式阻塞调用。
【讨论】:
【参考方案3】:也许你可以使用pgbouncer。对于 celery,什么都不应该改变,并且连接池是在进程之外完成的。我有同样的issue。
('也许',因为我不确定是否会有任何副作用)
【讨论】:
SQLAlchemy 的每个进程默认的最大连接数会出现副作用docs.sqlalchemy.org/en/14/errors.html#error-3o7r【参考方案4】:每个工作进程有一个数据库连接。由于 celery 本身维护一个工作进程池,因此您的数据库连接将始终等于 celery 工作人员的数量。 另一方面,它会将数据库连接池绑定到 celery 工作进程管理。但这应该没问题,因为 GIL 在一个进程中一次只允许一个线程。
【讨论】:
"[..] 由于 celery 本身维护着一个工作进程池 [..]" 你有文档的链接吗? 抱歉,我的目标是“每个工作进程一个数据库连接”部分。但这似乎不是文档的一部分。很烦人,对我来说似乎很重要。 @user1252307 你做这种事情的地方在文档中有描述。看我的回答。【参考方案5】:也许,celery.concurrency.gevent 可以提供池共享并且不会加剧 GIL。但是,它的支持仍然是“实验性的”。
还有一个psycopg2.pool.SimpleConnectionPool 在greenlets(协程)之间共享,它们都将在单个进程/线程中运行。
关于该主题的其他stack 讨论。
【讨论】:
【参考方案6】:您可以覆盖默认行为,在 celery 配置中让线程工作人员而不是每个进程的工作人员:
CELERYD_POOL = "celery.concurrency.threads.TaskPool"
然后您可以将共享池实例存储在您的任务实例上,并从每个线程任务调用中引用它。
【讨论】:
人们通常尽量避免使用 Python 线程吗? 取决于您要执行的操作。线程在 python 中适用于 I/O 绑定进程。仅当您使用 CPU 密集型时,您可能会遇到 GAL 问题。以上是关于Celery Worker 数据库连接池的主要内容,如果未能解决你的问题,请参考以下文章
使用 gevent 执行池的 celery 任务的 SynchronousOnlyOperation