Celery+RabbitMQ+Redis

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Celery+RabbitMQ+Redis相关的知识,希望对你有一定的参考价值。

参考技术A 在根目录下添加celery.py文件
celery.py文件

在根目录下的init.py中配置

设置中间人

settings.py配置

启动容器,映射端口

在app的目录下创建task.py
被装饰器装饰(@shared_task)用一个函数将异步任务封装 ,在视图函数调用

在视图函数中调用
调用函数的delay方法

Django的celery配置(包括定时任务队列)

一、安装celery

Django项目不需要安装celery这个包,可以直接使用django-celery这个包,,先来安装它,在终端中输入:

pip install django-celery

二、安装rabbitmq,建立celery队列

我做的项目用的就是rabbitmq,按道理来说,也是可以用redis作为消息队列的,但是rabbitmq更好,此处不做详细解释,有兴趣的同学的可以去研究下。
ubuntu环境下,在终端中输入:

sudo apt-get install rabbitmq-server

三、配置settings.py

首先要在INSTALLED_APPS中添加对djcelery的引用

INSTALLED_APPS = [
    ‘decelery‘,
]

再在settings.py中添加以下代码

import djcelery
djcelery.setup_loader()

BROKER_URL = ‘amqp://guest:[email protected]:5672/‘
CELERY_RESULT_BACKEND = ‘amqp‘
CELERY_IMPORTS = (
    "common.tasks.task1",
    "common.tasks.task2",
)

四、添加celery异步任务

在common/tasks/task1.py中添加以下代码,就定义了celery异步任务。
因为celery任务可能会很多,为了便于管理,我们就在项目下的common/tasks文件夹中创建了许多task.py文件,如task1.py、task2.py等。
在task1.py中添加以下代码:

from celery.task import task

@task
def function1(a, b):
    print a + b

注意,让让这个celery任务能执行,还必须要在settings.py中加一段配置,这个在上面已经添加了,这里再特别提醒下,如下:

# common、tasks是文件夹,task1、task2是tasks文件夹下面的py文件
# CELERY_IMPORTS:是导入目标任务文件
CELERY_IMPORTS = (
    "common.tasks.task1",
    "common.tasks.task2",
)

要调用这个异步任务的话,就用

from common.tasks.task1 import function

function.delay(a, b)

是不是很方便?工具的好处就是把事情简单化,但是太依赖工具,不懂的底层原理的话容易把人搞傻。
比如其实还可以用threading来创建新线程来执行异步任务,此处不再赘述,否则又是一套长篇大论,有兴趣的童鞋可以去学习一下如何使用threading库。

五、启动celery

因为我们这里用的是django-celery,而不是直接使用celery这个库,所以启动celery的命令与celery官网里面介绍的是不一样的,新人很可能因为这个掉进坑里,所以这里我特别提醒一下。
终端命令如下:

# 先启动服务器
python manage.py runserver
# 再启动worker(--concurrency=2是开4个worker进程,不加也可以启动,只不过在生产环境还是应该多开几个进程的)
python manage.py celery worker --concurrency=2 -l info

六、celery定时任务的配置

在项目中有时不仅仅使用以上的异步任务,有时候需要创建很多定时任务,这样celery又可以大显身手了。在settings.py中添加以下配置,就可以添加定时任务

from celery.schedules import crontab

# 下方的common和tasks依然是文件夹
# function2、function3分别是tasks文件夹中的task1.py、task2.py文件的函数的函数名
CELERYBEAT_SCHEDULE = {
    ‘function2‘: {
        ‘task‘: ‘common.tasks.task1‘,
        ‘schedule‘: crontab(minute=‘*/50‘), # 每50分钟执行一次
    },
    ‘function3‘: {
        ‘task‘: ‘common.tasks.task2‘,
        ‘schedule‘: crontab(minute=0, hour=‘8,13‘), # 每天的8点0分和13点0分各执行一次
    },
}

下面是common/tasks/task1.py中的函数

from celery.task import task

@task
def function2():
    print ‘=‘*40
    print ‘This is function2, celery is great!‘
    print ‘=‘*40

下面是common/tasks/task2.py中的函数,跟task1.py中是一样的使用方法。

from celery.task import task

@task
def function3():
    print ‘=‘*40
    print ‘This is function3, celery is great!‘
    print ‘Fuck celery start failure!‘
    print ‘=‘*40

七、启动celery定时任务

得先启动第六节中介绍的命令后,再执行这个命令,定时任务才能执行。因为beat只是分配定时任务给celery的worker,所以只有worker启动后,定时任务才能异步执行。
顾名思义,worker就是干苦力的民工,累活脏或都给它干。beat可以形象的理解为工地做计划的人,到了要干活的时候就分配任务给民工,可能是包工头,也可能是一般的管理工程进度的小弟。反正都是苦命的人,屌丝何必难为屌丝。 (╥╯^╰╥)
终端命令如下:

python manage.py celery beat -l info

八、celery队列的配置

在项目中celery的异步任务很多的时候,这个时候我们就需要将不同的任务分配到不同的队列(queue)中去执行,如果只有一个默认队列的话,所有异步任务都会在这个队列中执行(是需要排队的,先来的先执行),任务很多的时候,就没法同时执行很多任务了,甚至造成任务的拥堵。将不同的任务分配到不同的队列就可以保证同一时刻可以同时运行不同队列中的任务,互补干扰,并且每个队列可以单独开好几个进程。
进程数最好不要超过CPU的核数,因为CPU只有4个核的话,你开5个进程,同一时间还是只能执行4个进程。

我们可以在项目中设置三个队列(default, frontend, backend),队列的名字可以自己任意取。以下是在settings.py中添加队列的配置:

from kombu import Exchange, Queue

# 默认队列是default
CELERY_DEFAULT_QUEUE = ‘default‘
CELERY_DEFAULT_EXCHANGE = ‘default‘
CELERY_DEFAULT_ROUTING_KEY = ‘default‘

# x-priority是任务的优先级
# 优先级就是哪个队列优先执行,比较紧迫的需要马上执行的任务优先级可以设置为最高
CELERY_QUEUES = (
    Queue(‘default‘, Exchange(‘default‘), routing_key=‘default‘, consumer_arguments={‘x-priority‘: 5}),
    Queue(‘frontend‘, Exchange(‘frontend‘), routing_key=‘frontend‘, consumer_arguments={‘x-priority‘: 10}),
    Queue(‘backend‘, Exchange(‘backend‘), routing_key=‘backend‘, consumer_arguments={‘x-priority‘: 8}),
)

# 特别需要注意的是,异步任务的路径必须精确到函数名(比如下方的common、tasks是文件夹,task1、task2是py文件,function1就是task1.py中的定义的异步任务的函数名),不然的话异步任务就没法执行
CELERY_ROUTES = {
    "common.tasks.task1.function1": {‘queue‘: "frontend", ‘routing_key‘: ‘frontend‘},
    "common.tasks.task1.function2": {‘queue‘: "backend", ‘routing_key‘: ‘backend‘},
    "common.tasks.task2.function3": {‘queue‘: "default", ‘routing_key‘: ‘default‘},
}

九、启动celery队列

配置了队列的话,如果执行python manage.py celery worker --concurrency=2 -l info的话就只会创建一个默认的队列,而我们需要创建多个队列,这样我们就不需要运行这个命令了,我们需要在终端中分别运行以下命令:

# -Q 后面加的是配置的队列名,concurrency(进程数)设置为几就由自己定了,只要不超过CPU核数就行了
python manage.py celery worker -l info -Q default --concurrency=1
python manage.py celery worker -l info -Q frontend --concurrency=2
python manage.py celery worker -l info -Q backend --concurrency=4

十、补充

Django下要查看其他celery的命令,包括参数配置、启动多worker进程的方式都可以通过python manage.py celery --help来查看,一下是终端输入命令后出来的提示信息:

Usage: manage.py celery <command> [options] 

Show help screen and exit.

Options:
  -A APP, --app=APP     app instance to use (e.g. module.attr_name)
  -b BROKER, --broker=BROKER
                        url to broker.  default is ‘amqp://[email protected]//‘
  --loader=LOADER       name of custom loader class to use.
  --config=CONFIG       Name of the configuration module
  --workdir=WORKING_DIRECTORY
                        Optional directory to change to after detaching.
  -C, --no-color        
  -q, --quiet           
  --version             show program‘s version number and exit
  -h, --help            show this help message and exit

---- -- - - ---- Commands- -------------- --- ------------

+ Main: 
|    celery worker
|    celery events
|    celery beat
|    celery shell
|    celery multi
|    celery amqp

+ Remote Control: 
|    celery status
 
|    celery inspect --help
|    celery inspect active
|    celery inspect active_queues
|    celery inspect clock
|    celery inspect conf None
|    celery inspect memdump
|    celery inspect memsample
|    celery inspect objgraph None
|    celery inspect ping
|    celery inspect registered
|    celery inspect report
|    celery inspect reserved
|    celery inspect revoked
|    celery inspect scheduled
|    celery inspect stats
 
|    celery control --help
|    celery control add_consumer <queue> [exchange [type [routing_key]]]
|    celery control autoscale [max] [min]
|    celery control cancel_consumer <queue>
|    celery control disable_events
|    celery control enable_events
|    celery control pool_grow [N=1]
|    celery control pool_shrink [N=1]
|    celery control rate_limit <task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)>
|    celery control time_limit <task_name> <soft_secs> [hard_secs]

+ Utils: 
|    celery purge
|    celery list
|    celery migrate
|    celery call
|    celery result
|    celery report
---- -- - - --------- -- - -------------- --- ------------

Type ‘celery <command> --help‘ for help using a specific command.








以上是关于Celery+RabbitMQ+Redis的主要内容,如果未能解决你的问题,请参考以下文章

Celery + RabbitMQ +“发生套接字错误”

Celery 任务计划(Celery、Django 和 RabbitMQ)

celery+rabbitmq基本使用

Celery:使用 PostgreSQL 而不是 RabbitMQ

Celery 无法连接到 RabbitMQ 服务器

Celery 超时后没有将任务放回 RabbitMQ 队列