django-celery配置

Posted jiuyang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了django-celery配置相关的知识,希望对你有一定的参考价值。

1、项目启动顺序:

  启动项目:

python manage.py runserver

  启动celery beat

python manage.py celery beat

  启动celery worker

python manage celeryd -l info

  启动celery flower监控任务运行情况

celery flower --broker=redis://auth:[email protected]:6379

2、Django 结合 celery动态配置任务

  1、项目使用的版本 Django==1.11.7   celery==3.1.18 django-celery==3.2.2

  安装django-celery 安装celery

pip install celery==3.1.18
pip install django-celrey==3.2.2 

  2、Django结合celery

  (1)、在项目的初始文件夹下添加celery.py 文件

技术图片

celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

from django.conf import settings  # noqa

# set the default Django settings module for the ‘celery‘ program.
os.environ.setdefault(DJANGO_SETTINGS_MODULE, DSPProject.settings)

app = Celery(DSPProject)

# Using a string here means the worker doesn‘t have to serialize
# the configuration object to child processes.
# - namespace=‘CELERY‘ means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object(django.conf:settings)

# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print(Request: {0!r}.format(self.request))

  (2)  添加 celery配置到项目的settings.py 文件中

  settings.py

import djcelery
djcelery.setup_loader()

BROKER_URL = redis://auth:[email protected]:6379
CELERYBEAT_SCHEDULER = djcelery.schedulers.DatabaseScheduler  # 定时任务
CELERY_RESULT_BACKEND = djcelery.backends.database:DatabaseBackend
# CELERY_RESULT_BACKEND = ‘redis://localhost:6379‘
CELERY_ACCEPT_CONTENT = [application/json]
CELERY_TASK_SERIALIZER = json
CELERY_RESULT_SERIALIZER = json
CELERY_TIMEZONE = Asia/Shanghai
CELERY_ENABLE_UTC = False
CELERYD_CONCURRENCY = 10
CELERYD_MAX_TASKS_PER_CHILD = 1  #  每个worker最多执行1个任务就会被销毁,可防止内存泄露

LOGIN_REDIRECT_URL = /index/
UPLOAD_FILE_DIR = os.path.join(BASE_DIR, "people/upload/")
CHANGE_UPLOAD_DIR = os.path.join(BASE_DIR, "change/upload/")
LOG_FILE_DIR = os.path.join(BASE_DIR, "log/")
BACKUP_USER_INFO_DIR = /home/jiuyang/django/backup/user_info/

LOGGING = {
    version: 1,
    disable_existing_loggers: True,
    formatters: {
        standard: {
                format: %(levelname)s %(asctime)s %(message)s
                },
    },
    filters: {
    },
    handlers: {
        mail_admins: {
            level: ERROR,
            class: django.utils.log.AdminEmailHandler,
            formatter:standard,
        },
        people_handler: {
            level:DEBUG,
            class:logging.handlers.RotatingFileHandler,
            filename:%s%s % (LOG_FILE_DIR, people.log),
            formatter:standard,
        },
        report_handler: {
            level:DEBUG,
                   class:logging.handlers.RotatingFileHandler,
            filename:%s%s % (LOG_FILE_DIR, report.log),
            formatter:standard,
        },
        change_handler: {
            level:DEBUG,
                   class:logging.handlers.RotatingFileHandler,
            filename:%s%s % (LOG_FILE_DIR, change.log),
            formatter:standard,
        },
        dtmt_handler: {
            level:DEBUG,
                   class:logging.handlers.RotatingFileHandler,
            filename:%s%s % (LOG_FILE_DIR, dtmt.log),
            formatter:standard,
        },
        scheduled_tasks_handler: {
            level:DEBUG,
                   class:logging.handlers.RotatingFileHandler,
            filename:%s%s % (LOG_FILE_DIR, scheduled_tasks.log),
            formatter:standard,
        },
        business_query_handler: {
            level:DEBUG,
                   class:logging.handlers.RotatingFileHandler,
            filename:%s%s % (LOG_FILE_DIR, business_query.log),
            formatter:standard,
        },
    },
    loggers: {
        django.request: {
            handlers: [mail_admins],
            level: ERROR,
            propagate: True,
        },
        people_log:{
            handlers: [people_handler],
            level: INFO,
            propagate: False
        },
         report:{
            handlers: [report_handler],
            level: INFO,
                          propagate: False
        },
         change:{
            handlers: [change_handler],
            level: INFO,
                          propagate: False
        },
         dtmt:{
            handlers: [change_handler],
            level: INFO,
                          propagate: False
        },
         scheduled_tasks:{
            handlers: [scheduled_tasks_handler],
            level: INFO,
                          propagate: False
        },
         business_query:{
            handlers: [business_query_handler],
            level: INFO,
                          propagate: False
        },
    }
}

 

  (3)、写celery task实现具体的任务模板

技术图片

 task.py

from celery import shared_task

@shared_task(name=run_py)
def run_worke(*args):
    # import
    for line in args:
        print(run,runrurnrurnhr,line)
    return run python file ok

@shared_task(name=run_add)
def add():
    x= 1
    y =3
    print(x+y,ppppppppppppppppppppppppppppppppppppppppppppppppppppppppp)
    return x + y

  (4)实现定时任务添加配置

#主要代码

def add_periodic_task_spiders(request):
    # 提交新增周期任务数据
    response = HttpResponse()
    cur = Currency(request)
    rq_post = getattr(cur, rq_post)
    jdata = rq_post(data)
    data = json.loads(jdata)
    task_spiders = data[task_spiders]
    crontab = data[crontab]
    is_enable = data[is_enable]
    is_encrypt = data[is_encrypt]
    # is_sensitive = data[‘is_sensitive‘]
    task_name = data[task_name]
    task_template = data[task_template]

    #将数据信息导入到celery的执行对列中
    schedule = CrontabSchedule.objects.get(pk=crontab).schedule
    create_or_update_task = DatabaseScheduler.create_or_update_task
    schedule_dict = {
        schedule: schedule,
        task: task_template,
        args: [task_spiders],
        enabled: is_enable
    }
    create_or_update_task(task_name, **schedule_dict)
    # mail_excel(mail_header, task_name, sql_list, **mailpara)
    response.write(json.dumps({status: 0, msg: [操作成功]}))
    return response

3、celery 实现分布式配置

在成功运行Django-celery之后,copy   celery task源码到需要分布式的机器,完成任务配置

 

以上是关于django-celery配置的主要内容,如果未能解决你的问题,请参考以下文章

Django-celery 和 RabbitMQ 不执行任务

如何在 Django-Celery 失败的情况下设置重试任务

使用 django-celery 时如何创建单个类对象?

如何判断任务是不是已经在 django-celery 中排队?

django-celery 管理命令给出 AttributeError: '_multiprocessing.SemLock' 对象没有属性 'name'

Django-Celery 进度条