Celery 周期任务运行一段时间后意外停止
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Celery 周期任务运行一段时间后意外停止相关的知识,希望对你有一定的参考价值。
参考技术A 用Python Django做了一个网站。后端有些周期抓数据的需求,分布式任务队列 Celery 派上了用场。投入使用后,发现一个问题,运行一段时间后,周期更新的数据刷新时间停留在几天之前,Celery任务莫名其妙就不起作用了。查看日志,Celery beat日志是按周期在更新,但Celery worker日志停留在几天之前。查看进程,beat、worker进程均运行良好。一头雾水。每次碰到这种情况,只能重启。然后过一段时间又不起作用了,断断续续困扰大半年时间。
曾经也暗骂python轮子咋这样不靠谱,甚至也想转投java的怀抱,用spring boot搞一下。略一思考,转投java也有切换成本,换过去之后,也会碰到这样那样的问题。如果这个技术栈上碰到问题解决不了,换个技术栈碰到问题可能还是束手无策。换到java的好处可能是使用广泛,有问题都是别人已经趟过的坑,容易找到借鉴经验。小众的技术栈就没这么好的待遇了。
那么,想办法解决问题吧。
在google多番搜索,有一些线索可供参考。其中一个是说psycopg2、与postgres使用时可能会死锁。原因是postgres使用ssl时在一个callback中加了个锁,但是个callback是共用的,postgres自己unload时会释放这个锁,但是其他使用这个callback的并不知道,然后就死锁了。解决方案是把psycopg2升级到2.6版本以上。
具体可以参考Media上这篇文章。 https://medium.com/squad-engineering/two-years-with-celery-in-production-bug-fix-edition-22238669601d
但是我的版本已经是2.8了。所以这个解决方案并不能完全适用于我的问题。不过死锁还是给了我启发。可能celery worker执行某个任务时卡死了。
沿着这个线索继续探索吧。
OK,查死锁,下面进入debug阶段。
看得出来有2个任务是active状态,但是把timestamp转化一下,是2天之前了。这2个任务不可能要运行这么长时间的。那么肯定是卡住了。
Media文章里用的strace看卡哪了,嗯,可以效仿。一试,我的vps并没安装这个命令对应的库。好吗,毕竟买的廉价低配版,不增加负担。不就是要打印调用栈么,用cat /proc/pid/stack。
可以看得出来,一个卡在了tcp wait recv_msg上了。一个卡在了pipe_wait上了。2个任务都卡在了IO等待上。
这2个应该都不是死锁,抓取数据tcp请求不可能会死锁的,还是应该要设置超时时间。至于pipe,可能生产者意外退出,导致消费者拿不到数据而一直死等。
IO相关操作设置超时时间。
Django Celery 定期任务运行但 RabbitMQ 队列未被消耗
【中文标题】Django Celery 定期任务运行但 RabbitMQ 队列未被消耗【英文标题】:Django Celery Periodic Tasks Run But RabbitMQ Queues Aren't Consumed 【发布时间】:2012-10-27 20:56:56 【问题描述】:问题
通过 celery 的周期性任务调度器运行任务后,beat,为什么我在 RabbitMQ 中还有这么多未使用的队列?
设置
在 Heroku 上运行的 Django Web 应用程序 通过 celery beat 安排的任务 任务通过 celery worker 运行 消息代理是来自 CouldAMQP 的 RabbitMQProcfile
web: gunicorn --workers=2 --worker-class=gevent --bind=0.0.0.0:$PORT project_name.wsgi:application
scheduler: python manage.py celery worker --loglevel=ERROR -B -E --maxtasksperchild=1000
worker: python manage.py celery worker -E --maxtasksperchild=1000 --loglevel=ERROR
settings.py
CELERYBEAT_SCHEDULE =
'do_some_task':
'task': 'project_name.apps.appname.tasks.some_task',
'schedule': datetime.timedelta(seconds=60 * 15),
'args': ''
,
tasks.py
@celery.task
def some_task()
# Get some data from external resources
# Save that data to the database
# No return value specified
结果
每次任务运行时,我都会(通过 RabbitMQ Web 界面):
我的“已排队消息”下处于“就绪”状态的附加消息 一个附加队列,其中有一条消息处于“就绪”状态 此队列没有列出的消费者【问题讨论】:
【参考方案1】:它最终成为我对CELERY_RESULT_BACKEND
的设置。
以前是:
CELERY_RESULT_BACKEND = 'amqp'
将 RabbitMQ 更改为后,我不再有未使用的消息/队列:
CELERY_RESULT_BACKEND = 'database'
发生的事情似乎是,在执行任务后,celery 正在通过rabbitmq 发送有关该任务的信息,但是,没有任何设置来使用这些响应消息,因此一堆未读的消息结束了在队列中。
注意:这意味着 celery 将添加记录任务结果的数据库条目。为了防止我的数据库被无用的消息加载,我补充说:
# Delete result records ("tombstones") from database after 4 hours
# http://docs.celeryproject.org/en/latest/configuration.html#celery-task-result-expires
CELERY_TASK_RESULT_EXPIRES = 14400
Settings.py 中的相关部分
########## CELERY CONFIGURATION
import djcelery
# https://github.com/celery/django-celery/
djcelery.setup_loader()
INSTALLED_APPS = INSTALLED_APPS + (
'djcelery',
)
# Compress all the messages using gzip
# http://celery.readthedocs.org/en/latest/userguide/calling.html#compression
CELERY_MESSAGE_COMPRESSION = 'gzip'
# See: http://docs.celeryproject.org/en/latest/configuration.html#broker-transport
BROKER_TRANSPORT = 'amqplib'
# Set this number to the amount of allowed concurrent connections on your AMQP
# provider, divided by the amount of active workers you have.
#
# For example, if you have the 'Little Lemur' CloudAMQP plan (their free tier),
# they allow 3 concurrent connections. So if you run a single worker, you'd
# want this number to be 3. If you had 3 workers running, you'd lower this
# number to 1, since 3 workers each maintaining one open connection = 3
# connections total.
#
# See: http://docs.celeryproject.org/en/latest/configuration.html#broker-pool-limit
BROKER_POOL_LIMIT = 3
# See: http://docs.celeryproject.org/en/latest/configuration.html#broker-connection-max-retries
BROKER_CONNECTION_MAX_RETRIES = 0
# See: http://docs.celeryproject.org/en/latest/configuration.html#broker-url
BROKER_URL = os.environ.get('CLOUDAMQP_URL')
# Previously, had this set to 'amqp', this resulted in many read / unconsumed
# queues and messages in RabbitMQ
# See: http://docs.celeryproject.org/en/latest/configuration.html#celery-result-backend
CELERY_RESULT_BACKEND = 'database'
# Delete result records ("tombstones") from database after 4 hours
# http://docs.celeryproject.org/en/latest/configuration.html#celery-task-result-expires
CELERY_TASK_RESULT_EXPIRES = 14400
########## END CELERY CONFIGURATION
【讨论】:
您好,我正在尝试在我的应用程序上实现相同的堆栈,但我无法做到这一点,您能否发布所有与 celery 和 rabbitmq 相关的设置?我会非常感激,它会帮助其他新手。 当然,已添加。它基于出色的 django-skel 推荐。【参考方案2】:看起来您正在从已使用的任务中获得响应。
您可以通过以下方式避免这种情况:
@celery.task(ignore_result=True)
【讨论】:
这是正确的。我有同样的问题。在这里问答:***.com/questions/30327670/…以上是关于Celery 周期任务运行一段时间后意外停止的主要内容,如果未能解决你的问题,请参考以下文章