celery相关问题

Posted 小白的逆袭之旅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了celery相关问题相关的知识,希望对你有一定的参考价值。

在使用celery的时候发现很多问题的坑,相当的恶心,让人难以察觉。今天总结两个在平时生产环境中遇到的问题

一、celery worker卡死

问题描述:在使用celery启动worker的时候,开始是没有事情的,但是长时间的运行celery就会出现卡死。worker进程在某些个时间点卡死,不再处理broker的任务。

问题原因1:如果确认是因为当前worker的并发是prefork(多进程),并且可能是由于死锁原因造成!

解决办法1:那么可以使用 CELERYD_FORCE = True ,这样可以有效防止死锁。即使不是这个原因造成的,也尽量加上。

注意事项1:这种处理方式在4.0之后的版本不支持了,使用时需检查版本,也可采用下面的方式

 

问题原因2:进程hanging,说白了就是进程执行混乱,导致卡死。

解决办法2:可以使用time_limit参数,设定任务的执行超时时间,当超过这个时间的话,就先生成新的进程,并通过信号将hanging的进程杀死。

注意事项2:如果配置中使用了act_late的参数数,需要配合broker_transport_options = {\'visiblity_timeout\': 10 *60 }使用,这样,在task经过超时时间之后如果还没被ack, 就会被发送到其他worker去执行。如果没设置ack_late,代表对执行结果并不关心,这个参数也就没必要设置了

 


celery详细配置参考博客:http://blog.csdn.net/woshiaotian/article/details/36422781

 

二、celery任务重发

问题描述:celery有时候会对同一个任务进行多次执行,使你收到多个结果

问题原因1:启动多worker的时候,因为是异步执行的,可能使得多个worker去执行了同一个任务,导致这个现象

解决办法1:Celery Once ,利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base

@task(base=QueueOnce, once={\'graceful\': True})

后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。

 

使用步骤:

  1.安装

pip install -U celery_once

  2.增加配置

复制代码
from celery import Celery
from celery_once import QueueOnce
from time import sleep

celery = Celery(\'tasks\', broker=\'amqp://guest@localhost//\')
celery.conf.ONCE = {
  \'backend\': \'celery_once.backends.Redis\',
  \'settings\': {
    \'url\': \'redis://localhost:6379/0\',
    \'default_timeout\': 60 * 60
  }
}
复制代码

  3.修改delay方法

example.delay(10)
# 修改为
result = example.apply_async(args=(10))

  4.修改task函数

@celery.task(base=QueueOnce, once={\'graceful\': True, keys\': [\'a\']})
def slow_add(a, b):
    sleep(30)
    return a + b

 

 

问题原因2:对于eta/countdown延迟任务,有超时时间,如果超过超时时间任务未被执行,会被丢到下一个worker去执行,造成循环执行。当我们设置一个ETA时间比visibility_timeout长的任务时,每过一次 visibility_timeout 时间,celery就会认为这个任务没被worker执行成功,重新分配给其它worker再执行

解决办法2:设置超时等待时间

app.conf.broker_transport_options = {\'visibility_timeout\': 86400}

------------恢复内容开始------------

在使用celery的时候发现很多问题的坑,相当的恶心,让人难以察觉。今天总结两个在平时生产环境中遇到的问题

一、celery worker卡死

问题描述:在使用celery启动worker的时候,开始是没有事情的,但是长时间的运行celery就会出现卡死。worker进程在某些个时间点卡死,不再处理broker的任务。

问题原因1:如果确认是因为当前worker的并发是prefork(多进程),并且可能是由于死锁原因造成!

解决办法1:那么可以使用 CELERYD_FORCE = True ,这样可以有效防止死锁。即使不是这个原因造成的,也尽量加上。

注意事项1:这种处理方式在4.0之后的版本不支持了,使用时需检查版本,也可采用下面的方式

 

问题原因2:进程hanging,说白了就是进程执行混乱,导致卡死。

解决办法2:可以使用time_limit参数,设定任务的执行超时时间,当超过这个时间的话,就先生成新的进程,并通过信号将hanging的进程杀死。

注意事项2:如果配置中使用了act_late的参数数,需要配合broker_transport_options = {\'visiblity_timeout\': 10 *60 }使用,这样,在task经过超时时间之后如果还没被ack, 就会被发送到其他worker去执行。如果没设置ack_late,代表对执行结果并不关心,这个参数也就没必要设置了

 


celery详细配置参考博客:http://blog.csdn.net/woshiaotian/article/details/36422781

 

二、celery任务重发

问题描述:celery有时候会对同一个任务进行多次执行,使你收到多个结果

问题原因1:启动多worker的时候,因为是异步执行的,可能使得多个worker去执行了同一个任务,导致这个现象

解决办法1:Celery Once ,利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base

@task(base=QueueOnce, once={\'graceful\': True})

后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。

 

使用步骤:

  1.安装

pip install -U celery_once

  2.增加配置

复制代码
from celery import Celery
from celery_once import QueueOnce
from time import sleep

celery = Celery(\'tasks\', broker=\'amqp://guest@localhost//\')
celery.conf.ONCE = {
  \'backend\': \'celery_once.backends.Redis\',
  \'settings\': {
    \'url\': \'redis://localhost:6379/0\',
    \'default_timeout\': 60 * 60
  }
}
复制代码

  3.修改delay方法

example.delay(10)
# 修改为
result = example.apply_async(args=(10))

  4.修改task函数

@celery.task(base=QueueOnce, once={\'graceful\': True, keys\': [\'a\']})
def slow_add(a, b):
    sleep(30)
    return a + b

 

 

问题原因2:对于eta/countdown延迟任务,有超时时间,如果超过超时时间任务未被执行,会被丢到下一个worker去执行,造成循环执行。当我们设置一个ETA时间比visibility_timeout长的任务时,每过一次 visibility_timeout 时间,celery就会认为这个任务没被worker执行成功,重新分配给其它worker再执行

解决办法2:设置超时等待时间

app.conf.broker_transport_options = {\'visibility_timeout\': 86400}

------------恢复内容结束------------

------------恢复内容开始------------

在使用celery的时候发现很多问题的坑,相当的恶心,让人难以察觉。今天总结两个在平时生产环境中遇到的问题

一、celery worker卡死

问题描述:在使用celery启动worker的时候,开始是没有事情的,但是长时间的运行celery就会出现卡死。worker进程在某些个时间点卡死,不再处理broker的任务。

问题原因1:如果确认是因为当前worker的并发是prefork(多进程),并且可能是由于死锁原因造成!

解决办法1:那么可以使用 CELERYD_FORCE = True ,这样可以有效防止死锁。即使不是这个原因造成的,也尽量加上。

注意事项1:这种处理方式在4.0之后的版本不支持了,使用时需检查版本,也可采用下面的方式

 

问题原因2:进程hanging,说白了就是进程执行混乱,导致卡死。

解决办法2:可以使用time_limit参数,设定任务的执行超时时间,当超过这个时间的话,就先生成新的进程,并通过信号将hanging的进程杀死。

注意事项2:如果配置中使用了act_late的参数数,需要配合broker_transport_options = {\'visiblity_timeout\': 10 *60 }使用,这样,在task经过超时时间之后如果还没被ack, 就会被发送到其他worker去执行。如果没设置ack_late,代表对执行结果并不关心,这个参数也就没必要设置了

 


celery详细配置参考博客:http://blog.csdn.net/woshiaotian/article/details/36422781

 

二、celery任务重发

问题描述:celery有时候会对同一个任务进行多次执行,使你收到多个结果

问题原因1:启动多worker的时候,因为是异步执行的,可能使得多个worker去执行了同一个任务,导致这个现象

解决办法1:Celery Once ,利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base

@task(base=QueueOnce, once={\'graceful\': True})

后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。

 

使用步骤:

  1.安装

pip install -U celery_once

  2.增加配置

复制代码
from celery import Celery
from celery_once import QueueOnce
from time import sleep

celery = Celery(\'tasks\', broker=\'amqp://guest@localhost//\')
celery.conf.ONCE = {
  \'backend\': \'celery_once.backends.Redis\',
  \'settings\': {
    \'url\': \'redis://localhost:6379/0\',
    \'default_timeout\': 60 * 60
  }
}
复制代码

  3.修改delay方法

example.delay(10)
# 修改为
result = example.apply_async(args=(10))

  4.修改task函数

@celery.task(base=QueueOnce, once={\'graceful\': True, keys\': [\'a\']})
def slow_add(a, b):
    sleep(30)
    return a + b

 

 

问题原因2:对于eta/countdown延迟任务,有超时时间,如果超过超时时间任务未被执行,会被丢到下一个worker去执行,造成循环执行。当我们设置一个ETA时间比visibility_timeout长的任务时,每过一次 visibility_timeout 时间,celery就会认为这个任务没被worker执行成功,重新分配给其它worker再执行

解决办法2:设置超时等待时间

app.conf.broker_transport_options = {\'visibility_timeout\': 86400}

------------恢复内容开始------------

在使用celery的时候发现很多问题的坑,相当的恶心,让人难以察觉。今天总结两个在平时生产环境中遇到的问题

一、celery worker卡死

问题描述:在使用celery启动worker的时候,开始是没有事情的,但是长时间的运行celery就会出现卡死。worker进程在某些个时间点卡死,不再处理broker的任务。

问题原因1:如果确认是因为当前worker的并发是prefork(多进程),并且可能是由于死锁原因造成!

解决办法1:那么可以使用 CELERYD_FORCE = True ,这样可以有效防止死锁。即使不是这个原因造成的,也尽量加上。

注意事项1:这种处理方式在4.0之后的版本不支持了,使用时需检查版本,也可采用下面的方式

 

问题原因2:进程hanging,说白了就是进程执行混乱,导致卡死。

解决办法2:可以使用time_limit参数,设定任务的执行超时时间,当超过这个时间的话,就先生成新的进程,并通过信号将hanging的进程杀死。

注意事项2:如果配置中使用了act_late的参数数,需要配合broker_transport_options = {\'visiblity_timeout\': 10 *60 }使用,这样,在task经过超时时间之后如果还没被ack, 就会被发送到其他worker去执行。如果没设置ack_late,代表对执行结果并不关心,这个参数也就没必要设置了

 


celery详细配置参考博客:http://blog.csdn.net/woshiaotian/article/details/36422781

 

二、celery任务重发

问题描述:celery有时候会对同一个任务进行多次执行,使你收到多个结果

问题原因1:启动多worker的时候,因为是异步执行的,可能使得多个worker去执行了同一个任务,导致这个现象

解决办法1:Celery Once ,利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base

@task(base=QueueOnce, once={\'graceful\': True})

后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。

 

使用步骤:

  1.安装

pip install -U celery_once

  2.增加配置

复制代码
from celery import Celery
from celery_once import QueueOnce
from time import sleep

celery = Celery(\'tasks\', broker=\'amqp://guest@localhost//\')
celery.conf.ONCE = {
  \'backend\': \'celery_once.backends.Redis\',
  \'settings\': {
    \'url\': \'redis://localhost:6379/0\',
    \'default_timeout\': 60 * 60
  }
}
复制代码

  3.修改delay方法

example.delay(10)
# 修改为
result = example.apply_async(args=(10))

  4.修改task函数

@celery.task(base=QueueOnce, once={\'graceful\': True, keys\': [\'a\']})
def slow_add(a, b):
    sleep(30)
    return a + b

 

 

问题原因2:对于eta/countdown延迟任务,有超时时间,如果超过超时时间任务未被执行,会被丢到下一个worker去执行,造成循环执行。当我们设置一个ETA时间比visibility_timeout长的任务时,每过一次 visibility_timeout 时间,celery就会认为这个任务没被worker执行成功,重新分配给其它worker再执行

解决办法2:设置超时等待时间

app.conf.broker_transport_options = {\'visibility_timeout\': 86400}

------------恢复内容结束------------

------------恢复内容结束------------

以上是关于celery相关问题的主要内容,如果未能解决你的问题,请参考以下文章

使用 django-celery 时如何创建单个类对象?

尝试使用 celery beat 在 django 中调度一个函数但给出错误

Wagtail - 在页面上呈现带有相关片段和标签的数据时遇到问题

Jekyll 偏移代码片段高亮的初始行

Celery知识点总结

Celery详解