django入门 celery使用

Posted 重楼python

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了django入门 celery使用相关的知识,希望对你有一定的参考价值。

免费分享,平时搬砖,周末会录制匹配的视频。
配套视频地址:https://www.bilibili.com/video/BV1eQ4y1U7os/
配套源码 https://github.com/xmccxwj/django-celery

1.业务场景

双11在即,既然是光棍节,就应该品点新茶叶。为所有二级、三级会员推送新茶介绍。

首先这种业务不应该影响主逻辑,也就是后台人员编辑好内容,选择好目标会员后点击就返回提交成功,后台处理中,不是在那里等着后台处理完。

这个实现挺简单,访问后台接口,后台启动一个进程(在java可以起线程,异步),查询二级、三级会员,然后把事先编辑好的内容以邮箱或者短信的形式推送即可。

生产者:生成任务的 消费者:处理任务的

问题:

1.业务发展方面,市场调研发现一次性给用户推送茶叶,内容太多,
客户大多不看,效果不理想,于是计划每天发送一批茶叶即可,
让客户每天都有新鲜感.也挺简单,事先编辑好茶叶的发送日期,
一个定时任务+多进程即可完成
2.业务再次发展,喝茶得用一系列工具。。这些都可以作为商品推送,
也就是每天要发送得任务量越来越大,茶叶,蜡烛,皮鞭等等一系列东西。。
工作人员肯定是上班时间一次性录入,不可能等你后台发完一个在录入一个吧?
比如后台一次性接收到要发送的任务是500个,只能一个一个发,
那么剩下的这些任务得找个地方存呀,
也挺简单,一个queue(消息队列)就搞定了,java/python都简单,
实在后面任务更大500到5000再到5w。。那就用第三方能实现消息队列的
就可以了 比如rabbitmq/redis/mongo等
3.吩咐其它进程做了业务,你想知道结果。也很简单。
如果用的rabbitmq那么可以监听一个队列,
消费者那边做完了把结果发到这个队列,生产者监听这个队列即可。
如果是redis,利用发布订阅,或者blpop即可
4.当整个项目这种业务越来越多,当前服务器就那么大,
都给这种业务了,那些项目最基本的业务都没有地方跑了,
这个也简单可以把做这种任务的单独抽取到一个服务器,
不影响原来的基本业务服务器
5.任务太多了,全部打过去celery扛不住压力,就需要rate-limit限流什么的
其它:中断任务,监控任务,超时机制...........

上面随便针对一个项目中的业务,其实类似的需求很多很多,上续的问题都可以采用这样那样的实现方式,虽然不难也挺麻烦,所以就有了celery,它已经把上续功能基本都实现了,比如已经配置好了如何链接redis/rabbitmq,我们只需要告诉它链接那个redis,也就是地址,也已经写好了定时器,我们只需要告诉他定再什么时间就可以了…

2.celery介绍

celery是python开发,类似上面的场景都可以用celery实现

上面包含了celery的完整架构图:
1.async task 一般的异步任务 生产者
2.beat:定时异步任务        生产者
3.消息中间件(rabbitmq/redis等) 存储消息,官方建议用rabbitmq
4.监控flower 一个项目会有很多任务用到celery,就不得不监控,方便我们
排查问题,找到性能瓶颈完成调优等
5.worker:执行任务的,也称为消费者,首先主进程接收到任务,
启动子进程去执行任务(也可以通过配置使用eventlet创建协程或者绿色线程等)
6.backend:执行完毕得到结果,方便发送方/消费方获知结果
7.celery可以跟项目一起部署,也可以单独部署到服务器
pis:在整个任务流程中,会生成一个任务id唯一记录当次任务,这个很好理解,
只有通过唯一标识去分辨那么多任务。

如果没有celery,这些功能都需要程序员手动去实现。

没有采用官方听不懂的语言,相信这样介绍,大家应该能明白。

3.celery安装入门

大家记住上面的介绍,就知道在celery中我们需要做什么了。比如celery需要使用消息队列,并且已经整合好,我们至少得告诉它使用哪个消息队列,地址是多少吧。 比如celery已经完成得异步,不需要我们编写,但是我们得告诉它哪个方法需要异步吧,它已经整合好了结果存储,那么我们得告诉它存储的地址吧。

3.1首先创建项目安装celery

其它简单的可以比如redis安装等参考前面的博客

mkvirtualenv -p python3 shuozhongdian    创建虚拟环境 
workon shuozhongdian   进入虚拟环境
pip install django==2.2.1 -i https://pypi.douban.com/simple  安装django后续要整合
pip install redis==3.2.1 -i  https://pypi.douban.com/simple  使用redis做消息队列
pip3 install celery==4.4.2 -i https://pypi.douban.com/simple 安装celery 
pip install eventlet==0.25.1 -i https://pypi.douban.com/simple  安装eventlet 在windows中高版本celery需要
django-admin startproject shuozhongdian 创建项目

使用pycharm打开项目,选择好虚拟环境,创建task.py /test.py

3.2编写代码

task.py:

from celery import Celery
from celery.task import periodic_task
from celery.schedules import crontab
import time

# 定义一个celery对象
# shuozhongdian_celery 为这个celery改的名字 随便都可以
# broker:表示任务队列的位置
# backend:任务完成后结果存放的位置
# 其它参数 后面详解
app = Celery('shuozhongdian_celery', broker='redis://127.0.0.1:6379/6', backend='redis://127.0.0.1:6379/7')

# 普通任务 由 方法名 send_email.delay()触发
@app.task
def send_email(level, content):
    print('去数据库根据等级查询用户:'+"_".join(level))
    print('发送的内容为:' + content)
    time.sleep(20)
    print('发送邮箱结束')
    return "send_email success"


# 定时任务 每分钟执行一次 后续详解
@periodic_task(run_every=crontab())
def task():
    print('定时任务 task 开始!!!!!')
    time.sleep(10)
    print('定时任务 task 结束!!!!!')
    return 'task success'

test.py: 用来触发普通任务的

import task
import datetime


def send_email_to_level():
    import time
    start = time.time()
    print("1. 接收前台参数,数据处理巴拉巴拉")
    print("2. 调用celery...完成业务")
    # 可以获得返回值 后续会讲解
    result = task.send_email.delay(level=('2', '3'), content="shuozhongdian@gmail.com")
    print("3. 因为celery是异步的这里可以直接返回,提交成功,后台处理发送中,celery睡了20秒 这里查看下结束时间")
    print("耗时:%s 秒 " % (time.time() - start), ',当前时间为: %s' % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))


# 调用下方法
if __name__ == '__main__':
    send_email_to_level()

3.3启动程序

1.打开项目所在文件的cmd

celery -A task worker -l info -P eventlet -E

task是文件名称  
info 是日志级别
-P eventlet  表示使用什么池实现协程 eventlet,gevent 或者 solo等
其它参数后面谈论

2.普通任务

打开test.py 右键run就可以了

3.定时任务

celery -A task beat --loglevel=info

4.flower-celery监控

pip install flower==0.9.1 -i https://pypi.douban.com/simple
运行命令:
flower -A task --port=5555

5.django整合celery

项目下新建应用celerytest,加入配置新建task.py

5.1.在setting.py中加入配置

# SHUOZHONGDIAN是前缀这里注意大写
SHUOZHONGDIAN_BROKER_URL = 'redis://127.0.0.1:6379/1'  # 任务队列的位置
SHUOZHONGDIAN_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'  # 任务执行结果存放
SHUOZHONGDIAN_TIMEZONG = TIME_ZONE  # 项目中有定时任务 设置时区  这里跟项目一致 引用变量
SHUOZHONGDIAN_CELERY_ACCEPT_CONTENT  = ["json"]  # 配置celery可以接受哪些格式
SHUOZHONGDIAN_TASK_SERIALIZER = "json"  # 任务序列化格式 从上面的数组中选择
SHUOZHONGDIAN_RESULT_SERIALIZER = "json"  # 结果序列化数据格式
# 其它配置
# https://docs.celeryproject.org/en/stable/userguide/configuration.html

5.2在setting.py同级目录新建 celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
from celery.schedules import crontab

# 指定Django默认配置文件模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'shuozhongdian.settings')

app = Celery('abcde')

# 这里指定从django的settings.py里读取celery配置
app.config_from_object('django.conf:settings', namespace='SHUOZHONGDIAN')
# beat:定时任务配置
# 详细可参考参考https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
app.conf.beat_schedule = 
    'schedule_task':   # 随便取名字
        'task': 'celerytest.task.schedule_task',  # 指定需要定时的任务
        'schedule': crontab(minute=0, hour=0),  # 每天0:00 执行
    ,

#  发现任务文件每个app下的task.py
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

5.3编辑setting.py同级目录的__init__.py

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app. 确保celey被加载
from .celery import app as celery_app   # app对应celery.py中的app

__all__ = ('celery_app',)

5.4启动测试

进入shuozhongdian目录,打开cmd执行

 celery -A shuozhongdian worker -l info -P eventlet

也可以在linux安装redis

cd /usr/local

 yum install wget –y

wget http://download.redis.io/releases/redis-6.2.5.tar.gz

tar zxvf redis-6.2.5.tar.gz

cd redis-6.2.5

make

cd src

yum -y install gcc gcc-c++ libstdc++-devel 

make MALLOC=libc

make install PREFIX=/usr/local/redis


外部访问需要修改配置文件
cd ..
yum install vim -y
vim redis.conf
搜索里面的这两个个修改.
protected-mode后的yes改为no
找到bind 127.0.0.1注释掉(或改为0.0.0.0)
开放端口6379(防火墙、阿里云安全策略。。。)
cd src
./redis-server ../redis.conf
ps -ef|grep redis
查看是否开启

5.5添加任务

在celerytest下面的task.py里面编写

from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time

# 可以指定shared_task的name为任务名称 不指定默认采用方法名
# 注意这个名字跟celery.py里面配置的完全一样 所以是定时任务
@shared_task
def schedule_task():
    print('定时任务执行了,因为在celery指定了这个名字的任务加入定时任务')
    


@shared_task(name="send_email")
def send_email():
    print("发送邮箱的普通任务需要手动调用!!!")
    time.sleep(20)
    print('邮箱发送完毕,异步调用不影响主业务')
    return "s"

编写view视图

总的urls下面添加

from django.contrib import admin
from django.urls import path, include, re_path

urlpatterns = [
    path('admin/', admin.site.urls),
    re_path(r'^', include(('celerytest.urls', 'celerytest'), namespace='celerytest'))
]

在celerytest模块下添加urls

from django.urls import path, include, re_path
from .views import celery_test

urlpatterns = [
    re_path(r'^celerytest/', celery_test.as_view()),
]

views添加代码

from django.shortcuts import render
from django.views import View
from .task import send_email
from django.http import JsonResponse


# Create your views here.


class celery_test(View):
    def get(self, request):
        print("手动调用celery异步任务")
        send_email.delay()  # 执行异步任务
        return JsonResponse('code': 200, 'msg': '成功', safe=False)

5.6测试

启动项目

terminal被我们用来启动celery了

可以右键manage.py启动项目–报错


再次启动

访问http://127.0.0.1:8000/celerytest

当然也可以阻塞等待结果

5.7启动定时任务

因为terminal已经被用了,我们打开cmd启动定时任务

为了查看效果,将上面celery.py的配置改为每分钟执行一次

app.conf.beat_schedule = 
    'schedule_task':   # 随便取名字
        'task': 'celerytest.task.schedule_task',  # 指定需要定时的任务
        'schedule': crontab(),  # 删除就可以了 可以点进去这个方法看 都是*
    ,

再次启动celery,启动项目,然后再cmd输入

celery -A shuozhongdian beat -l info

再次启动

6.总结

celery入门就到这里了,项目中再去详细使用。

文档配套视频

以上是关于django入门 celery使用的主要内容,如果未能解决你的问题,请参考以下文章

Celery 分布式任务队列快速入门

Celery 分布式任务队列快速入门

Celery 分布式任务队列快速入门

Celery分布式任务队列快速入门

Celery 分布式任务队列快速入门

django+celery实现异步任务