celery相关问题
Posted 小白的逆袭之旅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了celery相关问题相关的知识,希望对你有一定的参考价值。
在使用celery的时候发现很多问题的坑,相当的恶心,让人难以察觉。今天总结两个在平时生产环境中遇到的问题
一、celery worker卡死
问题描述:在使用celery启动worker的时候,开始是没有事情的,但是长时间的运行celery就会出现卡死。worker进程在某些个时间点卡死,不再处理broker的任务。
问题原因1:如果确认是因为当前worker的并发是prefork(多进程),并且可能是由于死锁原因造成!
解决办法1:那么可以使用 CELERYD_FORCE = True ,这样可以有效防止死锁。即使不是这个原因造成的,也尽量加上。
注意事项1:这种处理方式在4.0之后的版本不支持了,使用时需检查版本,也可采用下面的方式
问题原因2:进程hanging,说白了就是进程执行混乱,导致卡死。
解决办法2:可以使用time_limit参数,设定任务的执行超时时间,当超过这个时间的话,就先生成新的进程,并通过信号将hanging的进程杀死。
注意事项2:如果配置中使用了act_late的参数数,需要配合broker_transport_options = {\'visiblity_timeout\': 10 *60 }使用,这样,在task经过超时时间之后如果还没被ack, 就会被发送到其他worker去执行。如果没设置ack_late,代表对执行结果并不关心,这个参数也就没必要设置了
celery详细配置参考博客:http://blog.csdn.net/woshiaotian/article/details/36422781
二、celery任务重发
问题描述:celery有时候会对同一个任务进行多次执行,使你收到多个结果
问题原因1:启动多worker的时候,因为是异步执行的,可能使得多个worker去执行了同一个任务,导致这个现象
解决办法1:Celery Once ,利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base
@task(base=QueueOnce, once={\'graceful\': True})
后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。
使用步骤:
1.安装
pip install -U celery_once
2.增加配置
from celery import Celery from celery_once import QueueOnce from time import sleep celery = Celery(\'tasks\', broker=\'amqp://guest@localhost//\') celery.conf.ONCE = { \'backend\': \'celery_once.backends.Redis\', \'settings\': { \'url\': \'redis://localhost:6379/0\', \'default_timeout\': 60 * 60 } }
3.修改delay方法
example.delay(10) # 修改为 result = example.apply_async(args=(10))
4.修改task函数
@celery.task(base=QueueOnce, once={\'graceful\': True, keys\': [\'a\']}) def slow_add(a, b): sleep(30) return a + b
问题原因2:对于eta/countdown延迟任务,有超时时间,如果超过超时时间任务未被执行,会被丢到下一个worker去执行,造成循环执行。当我们设置一个ETA时间比visibility_timeout长的任务时,每过一次 visibility_timeout 时间,celery就会认为这个任务没被worker执行成功,重新分配给其它worker再执行
解决办法2:设置超时等待时间
app.conf.broker_transport_options = {\'visibility_timeout\': 86400}
------------恢复内容开始------------
在使用celery的时候发现很多问题的坑,相当的恶心,让人难以察觉。今天总结两个在平时生产环境中遇到的问题
一、celery worker卡死
问题描述:在使用celery启动worker的时候,开始是没有事情的,但是长时间的运行celery就会出现卡死。worker进程在某些个时间点卡死,不再处理broker的任务。
问题原因1:如果确认是因为当前worker的并发是prefork(多进程),并且可能是由于死锁原因造成!
解决办法1:那么可以使用 CELERYD_FORCE = True ,这样可以有效防止死锁。即使不是这个原因造成的,也尽量加上。
注意事项1:这种处理方式在4.0之后的版本不支持了,使用时需检查版本,也可采用下面的方式
问题原因2:进程hanging,说白了就是进程执行混乱,导致卡死。
解决办法2:可以使用time_limit参数,设定任务的执行超时时间,当超过这个时间的话,就先生成新的进程,并通过信号将hanging的进程杀死。
注意事项2:如果配置中使用了act_late的参数数,需要配合broker_transport_options = {\'visiblity_timeout\': 10 *60 }使用,这样,在task经过超时时间之后如果还没被ack, 就会被发送到其他worker去执行。如果没设置ack_late,代表对执行结果并不关心,这个参数也就没必要设置了
celery详细配置参考博客:http://blog.csdn.net/woshiaotian/article/details/36422781
二、celery任务重发
问题描述:celery有时候会对同一个任务进行多次执行,使你收到多个结果
问题原因1:启动多worker的时候,因为是异步执行的,可能使得多个worker去执行了同一个任务,导致这个现象
解决办法1:Celery Once ,利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base
@task(base=QueueOnce, once={\'graceful\': True})
后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。
使用步骤:
1.安装
pip install -U celery_once
2.增加配置
from celery import Celery from celery_once import QueueOnce from time import sleep celery = Celery(\'tasks\', broker=\'amqp://guest@localhost//\') celery.conf.ONCE = { \'backend\': \'celery_once.backends.Redis\', \'settings\': { \'url\': \'redis://localhost:6379/0\', \'default_timeout\': 60 * 60 } }
3.修改delay方法
example.delay(10) # 修改为 result = example.apply_async(args=(10))
4.修改task函数
@celery.task(base=QueueOnce, once={\'graceful\': True, keys\': [\'a\']}) def slow_add(a, b): sleep(30) return a + b
问题原因2:对于eta/countdown延迟任务,有超时时间,如果超过超时时间任务未被执行,会被丢到下一个worker去执行,造成循环执行。当我们设置一个ETA时间比visibility_timeout长的任务时,每过一次 visibility_timeout 时间,celery就会认为这个任务没被worker执行成功,重新分配给其它worker再执行
解决办法2:设置超时等待时间
app.conf.broker_transport_options = {\'visibility_timeout\': 86400}
------------恢复内容结束------------
------------恢复内容开始------------
在使用celery的时候发现很多问题的坑,相当的恶心,让人难以察觉。今天总结两个在平时生产环境中遇到的问题
一、celery worker卡死
问题描述:在使用celery启动worker的时候,开始是没有事情的,但是长时间的运行celery就会出现卡死。worker进程在某些个时间点卡死,不再处理broker的任务。
问题原因1:如果确认是因为当前worker的并发是prefork(多进程),并且可能是由于死锁原因造成!
解决办法1:那么可以使用 CELERYD_FORCE = True ,这样可以有效防止死锁。即使不是这个原因造成的,也尽量加上。
注意事项1:这种处理方式在4.0之后的版本不支持了,使用时需检查版本,也可采用下面的方式
问题原因2:进程hanging,说白了就是进程执行混乱,导致卡死。
解决办法2:可以使用time_limit参数,设定任务的执行超时时间,当超过这个时间的话,就先生成新的进程,并通过信号将hanging的进程杀死。
注意事项2:如果配置中使用了act_late的参数数,需要配合broker_transport_options = {\'visiblity_timeout\': 10 *60 }使用,这样,在task经过超时时间之后如果还没被ack, 就会被发送到其他worker去执行。如果没设置ack_late,代表对执行结果并不关心,这个参数也就没必要设置了
celery详细配置参考博客:http://blog.csdn.net/woshiaotian/article/details/36422781
二、celery任务重发
问题描述:celery有时候会对同一个任务进行多次执行,使你收到多个结果
问题原因1:启动多worker的时候,因为是异步执行的,可能使得多个worker去执行了同一个任务,导致这个现象
解决办法1:Celery Once ,利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base
@task(base=QueueOnce, once={\'graceful\': True})
后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。
使用步骤:
1.安装
pip install -U celery_once
2.增加配置
from celery import Celery from celery_once import QueueOnce from time import sleep celery = Celery(\'tasks\', broker=\'amqp://guest@localhost//\') celery.conf.ONCE = { \'backend\': \'celery_once.backends.Redis\', \'settings\': { \'url\': \'redis://localhost:6379/0\', \'default_timeout\': 60 * 60 } }
3.修改delay方法
example.delay(10) # 修改为 result = example.apply_async(args=(10))
4.修改task函数
@celery.task(base=QueueOnce, once={\'graceful\': True, keys\': [\'a\']}) def slow_add(a, b): sleep(30) return a + b
问题原因2:对于eta/countdown延迟任务,有超时时间,如果超过超时时间任务未被执行,会被丢到下一个worker去执行,造成循环执行。当我们设置一个ETA时间比visibility_timeout长的任务时,每过一次 visibility_timeout 时间,celery就会认为这个任务没被worker执行成功,重新分配给其它worker再执行
解决办法2:设置超时等待时间
app.conf.broker_transport_options = {\'visibility_timeout\': 86400}
------------恢复内容开始------------
在使用celery的时候发现很多问题的坑,相当的恶心,让人难以察觉。今天总结两个在平时生产环境中遇到的问题
一、celery worker卡死
问题描述:在使用celery启动worker的时候,开始是没有事情的,但是长时间的运行celery就会出现卡死。worker进程在某些个时间点卡死,不再处理broker的任务。
问题原因1:如果确认是因为当前worker的并发是prefork(多进程),并且可能是由于死锁原因造成!
解决办法1:那么可以使用 CELERYD_FORCE = True ,这样可以有效防止死锁。即使不是这个原因造成的,也尽量加上。
注意事项1:这种处理方式在4.0之后的版本不支持了,使用时需检查版本,也可采用下面的方式
问题原因2:进程hanging,说白了就是进程执行混乱,导致卡死。
解决办法2:可以使用time_limit参数,设定任务的执行超时时间,当超过这个时间的话,就先生成新的进程,并通过信号将hanging的进程杀死。
注意事项2:如果配置中使用了act_late的参数数,需要配合broker_transport_options = {\'visiblity_timeout\': 10 *60 }使用,这样,在task经过超时时间之后如果还没被ack, 就会被发送到其他worker去执行。如果没设置ack_late,代表对执行结果并不关心,这个参数也就没必要设置了
celery详细配置参考博客:http://blog.csdn.net/woshiaotian/article/details/36422781
二、celery任务重发
问题描述:celery有时候会对同一个任务进行多次执行,使你收到多个结果
问题原因1:启动多worker的时候,因为是异步执行的,可能使得多个worker去执行了同一个任务,导致这个现象
解决办法1:Celery Once ,利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base
@task(base=QueueOnce, once={\'graceful\': True})
后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。
使用步骤:
1.安装
pip install -U celery_once
2.增加配置
from celery import Celery from celery_once import QueueOnce from time import sleep celery = Celery(\'tasks\', broker=\'amqp://guest@localhost//\') celery.conf.ONCE = { \'backend\': \'celery_once.backends.Redis\', \'settings\': { \'url\': \'redis://localhost:6379/0\', \'default_timeout\': 60 * 60 } }
3.修改delay方法
example.delay(10) # 修改为 result = example.apply_async(args=(10))
4.修改task函数
@celery.task(base=QueueOnce, once={\'graceful\': True, keys\': [\'a\']}) def slow_add(a, b): sleep(30) return a + b
问题原因2:对于eta/countdown延迟任务,有超时时间,如果超过超时时间任务未被执行,会被丢到下一个worker去执行,造成循环执行。当我们设置一个ETA时间比visibility_timeout长的任务时,每过一次 visibility_timeout 时间,celery就会认为这个任务没被worker执行成功,重新分配给其它worker再执行
解决办法2:设置超时等待时间
app.conf.broker_transport_options = {\'visibility_timeout\': 86400}
------------恢复内容结束------------
------------恢复内容结束------------
以上是关于celery相关问题的主要内容,如果未能解决你的问题,请参考以下文章
尝试使用 celery beat 在 django 中调度一个函数但给出错误