Celery add_periodic_task 阻止 Django 在 uwsgi 环境中运行

Posted

技术标签:

【中文标题】Celery add_periodic_task 阻止 Django 在 uwsgi 环境中运行【英文标题】:Celery add_periodic_task blocks Django running in uwsgi environment 【发布时间】:2018-04-26 01:50:37 【问题描述】:

我编写了一个模块,该模块根据项目设置中的字典列表(通过django.conf.settings 导入)动态添加定期 celery 任务。 我使用函数add_tasks 来执行此操作,该函数计划使用设置中给出的特定uuid 调用函数:

def add_tasks(celery):
    for new_task in settings.NEW_TASKS:
        celery.add_periodic_task(
            new_task['interval'],
            my_task.s(new_task['uuid']),
            name='My Task %s' % new_task['uuid'],
        )

像建议的here 一样,我使用on_after_configure.connect 信号来调用我的celery.py 中的函数:

app = Celery('my_app')

@app.on_after_configure.connect
def setup_periodic_tasks(celery, **kwargs):
    from add_tasks_module import add_tasks
    add_tasks(celery)

此设置对celery beatcelery worker 都适用,但会破坏我使用uwsgi 为我的django 应用程序提供服务的设置。 Uwsgi 运行平稳,直到视图代码第一次使用 celery 的 .delay() 方法发送任务。那时,celery 似乎在uwsgi 中初始化,但在上面的代码中永远阻塞。如果我从命令行手动运行它,然后在它阻塞时中断,我会得到以下(缩短的)堆栈跟踪:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'tasks'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'data'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'tasks'

During handling of the above exception, another exception occurred:
Traceback (most recent call last):

  (SHORTENED HERE. Just contained the trace from the console through my call to this function)

  File "/opt/my_app/add_tasks_module/__init__.py", line 42, in add_tasks
    my_task.s(new_task['uuid']),
  File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 146, in __getattr__
    return getattr(self._get_current_object(), name)
  File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 109, in _get_current_object
    return loc(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python3.6/site-packages/celery/app/__init__.py", line 72, in task_by_cons
    return app.tasks[
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__
    value = obj.__dict__[self.__name__] = self.__get(obj)
  File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 1228, in tasks
    self.finalize(auto=True)
  File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 507, in finalize
    with self._finalize_mutex:

似乎获取互斥锁有问题。

目前我正在使用一种解决方法来检测sys.argv[0] 是否包含uwsgi,然后不添加定期任务,因为只有beat 需要这些任务,但我想了解这里出了什么问题来解决问题更持久。

这个问题是否与使用 uwsgi 多线程或多进程(其中一个线程/进程持有其他需要的互斥锁)有关?

如果有任何可以帮助我解决问题的提示,我将不胜感激。谢谢。

我正在使用:Django 1.11.7 和 Celery 4.1.0

编辑 1

我已经为这个问题创建了一个最小的设置:

celery.py:

import os
from celery import Celery
from django.conf import settings
from myapp.tasks import my_task

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')

app = Celery('my_app')

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(
        60,
        my_task.s(),
        name='Testtask'
    )

app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

tasks.py:

from celery import shared_task
@shared_task()
def my_task():
    print('ran')

确保 CELERY_TASK_ALWAYS_EAGER=False 并且您有一个有效的消息队列。

运行:

./manage.py shell -c 'from myapp.tasks import my_task; my_task.delay()'

在中断之前等待大约 10 秒,以查看上述错误。

【问题讨论】:

你为什么要在 Django 进程中初始化它而不是有一个专门的进程来做呢? 我确实有一个专用的celery beat 进程,但我需要从 django 进程中调用.delay()。这就是代码阻塞的地方。 那么 add_tasks 没有从 Django 进程中被调用? @app.on_after_configure.connect 信号中调用。这似乎在每个进程中都会被触发,在 Django 进程中也是如此。 我建议您不要从 Django 代码中运行它。如果您仍然想像这样进行初始化,那么您应该有另一个 celery 任务并使用 Django 代码中的.delay 调用它 【参考方案1】:

所以,我发现 @shared_task 装饰器造成了问题。当我在信号调用的函数中声明任务时,我可以绕过这个问题,如下所示:

def add_tasks(celery):
    @celery.task
    def my_task(uuid):
        print(uuid)

    for new_task in settings.NEW_TASKS:
        celery.add_periodic_task(
            new_task['interval'],
            my_task.s(new_task['uuid']),
            name='My Task %s' % new_task['uuid'],
        )

这个解决方案实际上对我有用,但我还有一个问题:我在一个可插入的应用程序中使用这个代码,所以我不能在信号处理程序之外直接访问 celery 应用程序,但我也想成为能够从其他代码中调用my_task 函数。通过在函数中定义它,它在函数之外是不可用的,所以我不能在其他任何地方导入它。

我可以通过在信号函数之外定义任务函数来解决这个问题,并在此处和tasks.py 中将其与不同的装饰器一起使用。我想知道除了@shared_task 装饰器之外是否还有一个装饰器,我可以在tasks.py 中使用它不会产生问题。

目前的最佳解决方案可能是:

task_app.__init__.py:

def my_task(uuid):
    # do stuff
    print(uuid)

def add_tasks(celery):
    celery_my_task = celery.task(my_task)
    for new_task in settings.NEW_TASKS:
        celery.add_periodic_task(
            new_task['interval'],
            celery_my_task(new_task['uuid']),
            name='My Task %s' % new_task['uuid'],
        )

task_app.tasks.py:

from celery import shared_task
from task_app import my_task
shared_my_task = shared_task(my_task)

myapp.celery.py:

import os
from celery import Celery
from django.conf import settings


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')

app = Celery('my_app')

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    from task_app import add_tasks
    add_tasks(sender)


app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

【讨论】:

【参考方案2】:

你可以试试那个信号@app.on_after_finalize.connect

一些来自工作项目celery==4.1.0Django==2.0django-celery-beat==1.1.0django-celery-results==1.0.1 的快速sn-p

@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    """ setup of periodic task :py:func:shopify_data_fetcher.celery.fetch_shopify
    based on the schedule defined in: settings.CELERY_BEAT_SCHEDULE
    """
    for task_name, task_config in settings.CELERY_BEAT_SCHEDULE.items():
        sender.add_periodic_task(
            task_config['schedule'],
            fetch_shopify.s(**task_config['kwargs']['resource_name']),
            name=task_name
        )

CELERY_BEAT_SCHEDULE

CELERY_BEAT_SCHEDULE = 
    'fetch_shopify_orders': 
        'task': 'shopify.tasks.fetch_shopify',
        'schedule': crontab(hour="*/3", minute=0),
        'kwargs': 
            'resource_name': shopify_constants.SHOPIFY_API_RESOURCES_ORDERS
        
    

【讨论】:

感谢您的回答。我尝试了这个并没有帮助,但它为我指明了正确的方向:似乎@shared_task 装饰器的使用是问题所在。我将编辑我的问题。 你用什么装饰器来定义你的fetch_shopify celery 任务? 因为只剩下一小时了,我将接受我的回答,但将赏金奖励给你,因为你为我指明了正确的方向。

以上是关于Celery add_periodic_task 阻止 Django 在 uwsgi 环境中运行的主要内容,如果未能解决你的问题,请参考以下文章

通过 celery 向 django-channels 发送消息

celery:celery介绍架构基本使用,celery执行异步任务延迟任务定时任务,django中使用celery。

celery介绍

celery简单使用

celery 未处理的 celery 任务

celery 设置多少时间后运行