Celery 周期任务运行一段时间后意外停止

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Celery 周期任务运行一段时间后意外停止相关的知识,希望对你有一定的参考价值。

参考技术A 用Python Django做了一个网站。后端有些周期抓数据的需求,分布式任务队列 Celery 派上了用场。

投入使用后,发现一个问题,运行一段时间后,周期更新的数据刷新时间停留在几天之前,Celery任务莫名其妙就不起作用了。查看日志,Celery beat日志是按周期在更新,但Celery worker日志停留在几天之前。查看进程,beat、worker进程均运行良好。一头雾水。每次碰到这种情况,只能重启。然后过一段时间又不起作用了,断断续续困扰大半年时间。

曾经也暗骂python轮子咋这样不靠谱,甚至也想转投java的怀抱,用spring boot搞一下。略一思考,转投java也有切换成本,换过去之后,也会碰到这样那样的问题。如果这个技术栈上碰到问题解决不了,换个技术栈碰到问题可能还是束手无策。换到java的好处可能是使用广泛,有问题都是别人已经趟过的坑,容易找到借鉴经验。小众的技术栈就没这么好的待遇了。

那么,想办法解决问题吧。

在google多番搜索,有一些线索可供参考。其中一个是说psycopg2、与postgres使用时可能会死锁。原因是postgres使用ssl时在一个callback中加了个锁,但是个callback是共用的,postgres自己unload时会释放这个锁,但是其他使用这个callback的并不知道,然后就死锁了。解决方案是把psycopg2升级到2.6版本以上。

具体可以参考Media上这篇文章。 https://medium.com/squad-engineering/two-years-with-celery-in-production-bug-fix-edition-22238669601d

但是我的版本已经是2.8了。所以这个解决方案并不能完全适用于我的问题。不过死锁还是给了我启发。可能celery worker执行某个任务时卡死了。

沿着这个线索继续探索吧。

OK,查死锁,下面进入debug阶段。

看得出来有2个任务是active状态,但是把timestamp转化一下,是2天之前了。这2个任务不可能要运行这么长时间的。那么肯定是卡住了。

Media文章里用的strace看卡哪了,嗯,可以效仿。一试,我的vps并没安装这个命令对应的库。好吗,毕竟买的廉价低配版,不增加负担。不就是要打印调用栈么,用cat /proc/pid/stack。

可以看得出来,一个卡在了tcp wait recv_msg上了。一个卡在了pipe_wait上了。2个任务都卡在了IO等待上。

这2个应该都不是死锁,抓取数据tcp请求不可能会死锁的,还是应该要设置超时时间。至于pipe,可能生产者意外退出,导致消费者拿不到数据而一直死等。

IO相关操作设置超时时间。

Django Celery 定期任务运行但 RabbitMQ 队列未被消耗

【中文标题】Django Celery 定期任务运行但 RabbitMQ 队列未被消耗【英文标题】:Django Celery Periodic Tasks Run But RabbitMQ Queues Aren't Consumed 【发布时间】:2012-10-27 20:56:56 【问题描述】:

问题

通过 celery 的周期性任务调度器运行任务后,beat,为什么我在 RabbitMQ 中还有这么多未使用的队列?

设置

在 Heroku 上运行的 Django Web 应用程序 通过 celery beat 安排的任务 任务通过 celery worker 运行 消息代理是来自 CouldAMQP 的 RabbitMQ

Procfile

web: gunicorn --workers=2 --worker-class=gevent --bind=0.0.0.0:$PORT project_name.wsgi:application
scheduler: python manage.py celery worker --loglevel=ERROR -B -E --maxtasksperchild=1000
worker: python manage.py celery worker -E --maxtasksperchild=1000 --loglevel=ERROR

settings.py

CELERYBEAT_SCHEDULE = 
    'do_some_task': 
        'task': 'project_name.apps.appname.tasks.some_task',
        'schedule': datetime.timedelta(seconds=60 * 15),
        'args': ''
    ,

tasks.py

@celery.task
def some_task()
    # Get some data from external resources
    # Save that data to the database
    # No return value specified

结果

每次任务运行时,我都会(通过 RabbitMQ Web 界面):

我的“已排队消息”下处于“就绪”状态的附加消息 一个附加队列,其中有一条消息处于“就绪”状态 此队列没有列出的消费者

【问题讨论】:

【参考方案1】:

它最终成为我对CELERY_RESULT_BACKEND 的设置。

以前是:

CELERY_RESULT_BACKEND = 'amqp'

将 RabbitMQ 更改为后,我不再有未使用的消息/队列:

CELERY_RESULT_BACKEND = 'database'

发生的事情似乎是,在执行任务后,celery 正在通过rabbitmq 发送有关该任务的信息,但是,没有任何设置来使用这些响应消息,因此一堆未读的消息结束了在队列中。

注意:这意味着 celery 将添加记录任务结果的数据库条目。为了防止我的数据库被无用的消息加载,我补充说:

# Delete result records ("tombstones") from database after 4 hours
# http://docs.celeryproject.org/en/latest/configuration.html#celery-task-result-expires
CELERY_TASK_RESULT_EXPIRES = 14400

Settings.py 中的相关部分

########## CELERY CONFIGURATION
import djcelery
# https://github.com/celery/django-celery/
djcelery.setup_loader()

INSTALLED_APPS = INSTALLED_APPS + (
    'djcelery',
)

# Compress all the messages using gzip
# http://celery.readthedocs.org/en/latest/userguide/calling.html#compression
CELERY_MESSAGE_COMPRESSION = 'gzip'

# See: http://docs.celeryproject.org/en/latest/configuration.html#broker-transport
BROKER_TRANSPORT = 'amqplib'

# Set this number to the amount of allowed concurrent connections on your AMQP
# provider, divided by the amount of active workers you have.
#
# For example, if you have the 'Little Lemur' CloudAMQP plan (their free tier),
# they allow 3 concurrent connections. So if you run a single worker, you'd
# want this number to be 3. If you had 3 workers running, you'd lower this
# number to 1, since 3 workers each maintaining one open connection = 3
# connections total.
#
# See: http://docs.celeryproject.org/en/latest/configuration.html#broker-pool-limit
BROKER_POOL_LIMIT = 3

# See: http://docs.celeryproject.org/en/latest/configuration.html#broker-connection-max-retries
BROKER_CONNECTION_MAX_RETRIES = 0

# See: http://docs.celeryproject.org/en/latest/configuration.html#broker-url
BROKER_URL = os.environ.get('CLOUDAMQP_URL')

# Previously, had this set to 'amqp', this resulted in many read / unconsumed
# queues and messages in RabbitMQ
# See: http://docs.celeryproject.org/en/latest/configuration.html#celery-result-backend
CELERY_RESULT_BACKEND = 'database'

# Delete result records ("tombstones") from database after 4 hours
# http://docs.celeryproject.org/en/latest/configuration.html#celery-task-result-expires
CELERY_TASK_RESULT_EXPIRES = 14400
########## END CELERY CONFIGURATION

【讨论】:

您好,我正在尝试在我的应用程序上实现相同的堆栈,但我无法做到这一点,您能否发布所有与 celery 和 rabbitmq 相关的设置?我会非常感激,它会帮助其他新手。 当然,已添加。它基于出色的 django-skel 推荐。【参考方案2】:

看起来您正在从已使用的任务中获得响应。

您可以通过以下方式避免这种情况:

@celery.task(ignore_result=True)

【讨论】:

这是正确的。我有同样的问题。在这里问答:***.com/questions/30327670/…

以上是关于Celery 周期任务运行一段时间后意外停止的主要内容,如果未能解决你的问题,请参考以下文章

Django 和 Celery 的示例:周期性任务

为啥使用 Celery 运行计划任务比使用 crontab 更可取?

如何从 django 模板暂停和停止 celery 任务

芹菜节拍不接周期性任务

celery异步,延时任务, 周期任务

Django使用Celery异步任务队列