django+celery 实现分布式任务

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了django+celery 实现分布式任务相关的知识,希望对你有一定的参考价值。

想用django做一个自动运维平台,利用netsnmp来获取交换机及服务器信息,但是snmpget任务需要在后台实时运行,为了不影响html响应,利用celery来结合django做异步任务队列。

一、环境准备
1.首先安装celery
pip3 install celery
2.安装djcelery
pip3 install django-celery
3.安装一个broker
我们必须拥有一个broker消息队列用于发送和接收消息。Celery官网给出了多个broker的备选方案:RabbitMQ、Redis、Database(不推荐)以及其他的消息中间件。本次我们利用redis

sudo apt-get install redis

redis-server
启动redis服务, 端口假设为6379
redis-cli 查看redis 状态
[email protected]:~/Desktop/webserver/myserver# redis-cli
127.0.0.1:6379>

二、django 配置
配置settings.py
首先,在Django工程的settings.py文件中加入如下配置代码:

celery 配置信息 start

#############################
import djcelery

celery 配置

djcelery.setup_loader()
BROKER_URL = ‘redis://127.0.0.1:6379/1‘
CELERY_TIMEZONE = ‘Asia/Shanghai‘
CELERYBEAT_SCHEDULER = ‘djcelery.schedulers.DatabaseScheduler‘
CELERY_IMPORTS = (‘serverapp.task‘)
#############################

celery 配置信息 end

#############################

当djcelery.setup_loader()运行时,Celery便会去查看INSTALLD_APPS下包含的所有app目录中的tasks.py文件,找到标记为task的方法,将它们注册为celery task

? BROKER_URL:broker是代理人,它负责分发任务给worker去执行。我使用的是Redis作为broker

? 没有设置 CELERY_RESULT_BACKEND,默认没有配置,此时Django会使用默认的数据库(也是你指定的orm数据库)。

CELERY_IMPORTS:是导入目标任务文件

CELERYBEAT_SCHEDULER:使用了django-celery默认的数据库调度模型,任务执行周期都被存在默认指定的orm数据库中.

CELERYBEAT_SCHEDULE:设置定时的时间配置, 可以精确到秒,分钟,小时,天,周等。

3.在主工程的配置文件settings.py 中应用注册表INSTALLED_APPS中加入 djcelery

INSTALLED_APPS = [
‘django.contrib.admin‘,
‘django.contrib.auth‘,
‘django.contrib.contenttypes‘,
‘django.contrib.sessions‘,
‘django.contrib.messages‘,
‘django.contrib.staticfiles‘,
‘djcelery‘, #加入djcelery
]

4.(3)创建应用实例

? 在主工程目录添加celery.py, 添加自动检索django工程tasks任务

? vim artproject/celery.py

#目的是拒绝隐士引入,celery.py和celery冲突。
from future import absolute_import

import os

from celery import Celery, platforms
platforms.C_FORCE_ROOT = True

os.environ.setdefault(‘DJANGO_SETTINGS_MODULE‘, ‘myserver.settings‘)
#Specifying the settings here means the celery command line program will know where your Django project is.
#This statement must always appear before the app instance is created, which is what we do next:
from django.conf import settings

app = Celery(‘serverapp‘)

app.config_from_object(‘django.conf:settings‘)
#This means that you don’t have to use multiple configuration files, and instead configure Celery directly from the Django settings.
#You can pass the object directly here, but using a string is better since then the worker doesn’t have to serialize the object.

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
#With the line above Celery will automatically discover tasks in reusable apps if you define all tasks in a separate tasks.py module.
#The tasks.py should be in dir which is added to INSTALLED_APP in settings.py.
#So you do not have to manually add the individual modules to the CELERY_IMPORT in settings.py.

@app.task(bind=True)
def debug_task(self):
print(‘Request: {0!r}‘.format(self.request)) #dumps its own request information

5.(4) 创建任务 tasks

每个任务本质上就是一个函数,在tasks.py中,写入你想要执行的函数即可。

在应用art中添加我们需要提供的异步服务和定时服务

vim art/tasks.py

#!/usr/bin/env python

encoding: utf-8

from future import absolute_import
import time
from django.core.mail import send_mail
from celery.utils.log import get_task_logger
from artproject.celery import app
?
from art.utils.send_mail import pack_html, send_emailbr/>?
@app.task
def tsend_email():
url = "
http://1000phone.com"
receiver = ‘[email protected]
content = pack_html(receiver, url)

content = ‘this is email content.‘

send_email(receiver, content)
print(‘send email ok!‘)br/>?
?
@app.task
def add(x, y):
return x+y
6.迁移生成celery需要的数据表

python manage.py migrate
此时数据库表结构多出了几个

7.
8.
9.启动服务,测试
我们可以采用 python manage.py help 发现多出了 celery 相关选项。

(1)启动django celery 服务

启动服务:

python manage.py celery worker --loglevel=info

此时异步处理和定时处理服务都已经启动了

(2)web端接口触发异步任务处理

我们在web端加入一个入口,触发异步任务处理add函数

在应用art的urls.py 中加入如下对应关系

from art.views import add_handler
?
?
url(r‘^add‘, add_handler),

art/views.py 中加入处理逻辑

def add_handler(request):
x = request.GET.get(‘x‘, ‘1‘)
y = request.GET.get(‘y‘, ‘1‘)
from .tasks import add
add.delay(int(x), int(y))
res = {‘code‘:200, ‘message‘:‘ok‘, ‘data‘:[{‘x‘:x, ‘y‘:y}]}
return HttpResponse(json.dumps(res))

启动web服务,通过url传入的参数,通过handler的add.delay(x, y)计算并存入mysql

http://127.0.0.1:8000/art/add?x=188&y=22

(4) 测试定时器,发送邮件

在终端输入 python manage.py celerybeat -l info

会自动触发每隔30s执行一次tsend_email定时器函数,发送邮件:

CELERYBEAT_SCHEDULE = { #定时器策略
#定时任务一: 每隔30s运行一次
u‘测试定时器1‘: {
"task": "art.tasks.tsend_email",
#"schedule": crontab(minute=‘*/2‘), # or ‘schedule‘: timedelta(seconds=3),
"schedule":timedelta(seconds=30),
"args": (),
},
}
具体发送邮件服务程序见下面的第4节

4 邮件发送服务
项目中经常会有定时发送邮件的情形,比如发送数据报告,发送异常服务报告等。

可以编辑文件 art/utils/send_mail.py, 内容编辑如下:

#!/usr/bin/env python
#-- coding:utf-8 --
#written by zhouguangyou
#发送邮件(wd_email_check123账号用于内部测试使用,不要用于其他用途)
?
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.header import Header
import time
?
sender = ‘[email protected]
subject = u‘api开放平台邮箱验证‘
smtpserver = ‘smtp.163.com‘
username = ‘wwwwww‘
password = ‘wwwww1234‘
mail_postfix="163.com"
?
def send_email(receiver, content):
try:
me = username+"<"+username+"@"+mail_postfix+">"
msg = MIMEText(content, ‘html‘, ‘utf-8‘)
msg[‘Subject‘] = subject
msg[‘From‘] = sender
msg[‘To‘] = receiver
smtp = smtplib.SMTP()
smtp.connect(smtpserver)
smtp.login(username, password)
smtp.sendmail(sender, receiver, msg.as_string())
smtp.quit()
return True
except Exception as e:
print(‘send_email has error with : ‘ + str(e))
return False
?
?
def pack_html(receiver, url):
html_content = u"<html><div>尊敬的用户<font color=‘#0066FF‘>%s</font> 您好!</div><br>"
"<div>感谢您关注我们的平台 ,我们将为您提供最贴心的服务,祝您购物愉快。</div><br>"
"<div>点击以下链接,即可完成邮箱安全验证:</div><br>"
"<div><a href=‘%s‘>%s</a></div><br>"
"<div>为保障您的帐号安全,请在24小时内点击该链接; </div><br>"
"<div>若您没有申请过验证邮箱 ,请您忽略此邮件,由此给您带来的不便请谅解。</div>"
"</html>" % (receiver, url, url)
html_content = html_content
return html_content
?
?
if name == "main":
url = "http://xxxx.com"
receiver = ‘[email protected]
#content = pack_html(receiver, url)
content = ‘this is email content. at %s.‘%int(time.time())
send_email(receiver, content)

至此,在celery ui界面可以看到两类,定时器处理和异步处理。



以上是关于django+celery 实现分布式任务的主要内容,如果未能解决你的问题,请参考以下文章

Celery 分布式任务队列快速入门

Celery 分布式任务队列快速入门

Celery 分布式任务队列快速入门

Celery 分布式任务队列快速入门

Celery结合Django使用

celery