celery

Posted aaronthon

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了celery相关的知识,希望对你有一定的参考价值。

Celery是一个异步任务的调度工具。

详细介绍以后补充,直接上代码和使用方法。如下:

结构目录如下:
celery_homedir——————————————————主目录——————————————————1
————celery_subdir———————————————子目录——————————————————2
————————__init__.py—————————————子目录内初始化文件————————3——空文件
————————celery_subtasks1.py—————异步任务1———————————————4
————————celery_subtasks2.py—————异步任务2
————————celery_subtasks3.py—————异步任务3
......
————————celery_subtasksn.py—————异步任务n
————__init__.py—————————————————celery主目录内初始化文件——5
————celeryconfig.py—————————————celery配置文件——————————6

celery_homedir文件夹,包含整个celery全部配置的文件。

celery_subdir文件夹是二级目录,包含全部异步任务。

celery_subtasks1.py包含单个子任务全部代码。

# coding: utf-8

import sys
from celery_task import app

reload(sys)
sys.setdefaultencording("utf8")


# 需要定时被执行的程序
class Test1(object):
    def GET(self):
        print "这是测试程序1"
        return "ok"

# 加celery装饰器
def celery_run():
    Test1.GET()

celery_subtasks2.py包含单个子任务全部代码。

# coding: utf-8

import sys
from celery_task import app

reload(sys)
sys.setdefaultencording("utf8")


# 需要定时被执行的程序
class Test2(object):
    def GET(self):
        print "这是测试程序2"
        return "ok"

# 加celery装饰器
def celery_run():
    Test2.GET()

celery_homedir内的__init__.py(celery主目录内初始化文件——5),包含celery的主程序文件。

#!/user/bin/python
# coding: utf-8

from __future__ import absolute_import
from celery import Celery


# 创建celery应用对象
app = Celery("celery_demo")

# 导入celery配置信息
app.config_from_object("celery_homedir.celeryconfig")

celeryconfig.py,包含celery的全部配置文件,比如启动事件,周期等信息。

#!/user/bin/python
# coding: utf-8

from __future__ import absolute_import
from celery.schedules import crontab
from common.config.Config import IS_TEST_SYSTEM

if IS_TEST_SYSTEM:  # 测试环境
     BROKER_URL = "redis://10.130.1.1:6379/0" # 存储任务队列
     CELERY_RESULT_BACKEND = "redis://10.130.2.2:6379/0" # 存储结果
else:  # 生产环境
     BROKER_URL = "redis://100.126.1.1:6379/0" # 存储任务队列
     CELERY_RESULT_BACKEND = "redis://10.126.2.2:6379/0" # 存储结果

# 设置时间
CELERY_TIMEZONE = "Asia/ShangHai"
CELERY_TASK_RESULT_EXPIRES = 60*60*24

# task_serializer = "json"
# result_serializer = "json"
# accept_content = ["json"]
# worker_hijack_root_logger = False  # 默认开启日志,可关闭自定义日志,不关闭自定义输出日志为空

CELERY_WORKER_CONCURRENCY = 5
CELERY_WORKER_MAX_TASK_PRE_CHILD = 200

# 导入任务所在文件
CELERY_IMPORTS = [
    "celery_homedir.celery_subdir.celery_subtasks1",   
    "celery_homedir.celery_subdir.celery_subtasks2",
]  # 这里只写两个任务

# 需要执行的任务配置
CELERYBEAT_SCHEDULE = 
    "celery_subtasks1": 
        "task": "celery_homedir.celery_subdir.celery_subtasks1.celery_run",  # 执行的函数
        "schedule": crontab(minute="*/1"),  # 每分钟执行一次
        "args": ()
    ,

    "celery_subtasks2": 
        "task": "celery_homedir.celery_subdir.celery_subtasks2.celery_run",  # 执行的函数
        "schedule": crontab(minute=0, hour="*/1"),  # 每小时执行一次
        "args": ()
    

 windows环境启动celery

比如pycharm
在两个Terminal界面里面分别输入如下指令:
1, celery -A celery_homedir worker --loglevel=info  # 启动程序
2, celery beat -A celery_homedir # 开启定时任务

 

以上是关于celery的主要内容,如果未能解决你的问题,请参考以下文章

celery haystack

celery

celery简介

celery-1

Celery

Celery的使用