Celery 为每个任务创建一个新连接
Posted
技术标签:
【中文标题】Celery 为每个任务创建一个新连接【英文标题】:Celery creating a new connection for each task 【发布时间】:2012-08-14 08:10:30 【问题描述】:我使用 Celery 和 Redis 来运行一些后台任务,但是每次调用任务时,它都会创建一个到 Redis 的新连接。我在 Heroku 上,我的 Redis to Go 计划允许 10 个连接。我很快就达到了这个限制并收到“达到最大客户数”的错误。
如何确保 Celery 在单个连接上对任务进行排队,而不是每次都打开一个新连接?
EDIT - 包括完整的回溯
File "/app/.heroku/venv/lib/python2.7/site-packages/django/core/handlers/base.py", line 111, in get_response
response = callback(request, *callback_args, **callback_kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__
self._nr_instance, args, kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/hooks/framework_django.py", line 447, in wrapper
return wrapped(*args, **kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/django/views/decorators/csrf.py", line 77, in wrapped_view
return view_func(*args, **kwargs)
File "/app/feedback/views.py", line 264, in zencoder_webhook_handler
tasks.process_zencoder_notification.delay(webhook)
File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/task.py", line 343, in delay
return self.apply_async(args, kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/task.py", line 458, in apply_async
with app.producer_or_acquire(producer) as P:
File "/usr/local/lib/python2.7/contextlib.py", line 17, in __enter__
return self.gen.next()
File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/base.py", line 247, in producer_or_acquire
with self.amqp.producer_pool.acquire(block=True) as producer:
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 705, in acquire
R = self.prepare(R)
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 54, in prepare
p = p()
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 45, in <lambda>
return lambda: self.create_producer()
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 42, in create_producer
return self.Producer(self._acquire_connection())
File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 160, in __init__
super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/messaging.py", line 83, in __init__
self.revive(self.channel)
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/messaging.py", line 174, in revive
channel = self.channel = maybe_channel(channel)
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 879, in maybe_channel
return channel.default_channel
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 617, in default_channel
self.connection
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 610, in connection
self._connection = self._establish_connection()
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 569, in _establish_connection
conn = self.transport.establish_connection()
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/virtual/__init__.py", line 722, in establish_connection
self._avail_channels.append(self.create_channel(self))
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/virtual/__init__.py", line 705, in create_channel
channel = self.Channel(connection)
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/redis.py", line 271, in __init__
self.client.info()
File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__
self._nr_instance, args, kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/function_trace.py", line 81, in literal_wrapper
return wrapped(*args, **kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/redis/client.py", line 344, in info
return self.execute_command('INFO')
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/redis.py", line 536, in execute_command
conn.send_command(*args)
File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 273, in send_command
self.send_packed_command(self.pack_command(*args))
File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 256, in send_packed_command
self.connect()
File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__
self._nr_instance, args, kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/function_trace.py", line 81, in literal_wrapper
return wrapped(*args, **kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 207, in connect
self.on_connect()
File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 233, in on_connect
if self.read_response() != 'OK':
File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 283, in read_response
raise response
ResponseError: max number of clients reached
【问题讨论】:
BROKER_POOL_LIMIT 设置用于限制可以同时使用的连接数。 我将它设置为 5。我只是尝试降低到 2,但它似乎并没有改变任何东西。我仍然得到错误,redis“客户列表”迅速跳到10。 池限制仅限制 Kombu Connection 对象,它不是全局限制。每个Connection可能同时使用多个redis连接(特别是worker可能使用多个,如果你使用redis结果后端那么每个子进程也使用一个连接) 顺便说一句,redis-py库也有一个连接池,也许你也可以限制连接,我之前没试过。 @asksol 看起来 ConnectionPool 有连接计数限制,但它没有并发打开连接限制处理 【参考方案1】:我在使用 CloudAMQP 的 Heroku 上遇到了同样的问题。我不知道为什么,但是在将低整数分配给 BROKER_POOL_LIMIT
设置时我没有运气。
最终,我发现通过设置BROKER_POOL_LIMIT=None
或BROKER_POOL_LIMIT=0
我的问题得到了缓解。根据 Celery 文档,这会禁用连接池。到目前为止,这对我来说不是一个明显的问题,但我不确定它是否适合你。
相关信息链接:http://celery.readthedocs.org/en/latest/configuration.html#broker-pool-limit
【讨论】:
【参考方案2】:我希望我使用的是 Redis,因为有一个特定的选项可以限制连接数:CELERY_REDIS_MAX_CONNECTIONS
。
MongoDB
有类似的后端设置。
鉴于这些后端设置,我不知道 BROKER_POOL_LIMIT
实际做了什么。希望CELERY_REDIS_MAX_CONNECTIONS
能解决您的问题。
我是使用 CloudAMQP 的人之一,而 AMQP 后端没有自己的连接限制参数。
【讨论】:
顺便说一句,BROKER_POOL_LIMIT=0
解决方法解决了我的问题。我赞成该解决方案。【参考方案3】:
尝试这些设置:
CELERY_IGNORE_RESULT = True
CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
【讨论】:
【参考方案4】:我遇到了类似的问题,涉及连接数和 Celery。它不在 Heroku 上,但它是 Mongo 而不是 Redis。
我在任务模块级别的任务功能定义之外启动了连接。至少对于 Mongo 来说,这允许任务共享连接。
希望对您有所帮助。
https://github.com/instituteofdesign/wander/blob/master/wander/tasks.py
mongoengine.connect('stored_messages')
@celery.task(default_retry_delay = 61)
def pull(settings, google_settings, user, folder, messageid):
'''
Pulls a message from zimbra and stores it in Mongo
'''
try:
imap = imap_connect(settings, user)
imap.select(folder, True)
.......
【讨论】:
以上是关于Celery 为每个任务创建一个新连接的主要内容,如果未能解决你的问题,请参考以下文章
使用 websockets 时,我应该为每个不同的任务打开一个新的 websocket 连接吗?还是我应该在一个连接中完成所有事情?
如何在 Django 模型中使用 celery beat 为每个对象创建单独的任务
在所有 celery worker 之间共享一个到 mongo db 的连接