Python Celery队列
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python Celery队列相关的知识,希望对你有一定的参考价值。
Celery队列简介:
Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery.
使用场景:
1.你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。
2.你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福
Celery原理:
Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis 或者是数据库来存放消息的中间结果
Celery优点:
- 简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
- 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
- 快速:一个单进程的celery每分钟可处理上百万个任务
- 灵活: 几乎celery的各个组件都可以被扩展及自定制
Celery缺点:
1.目前只能在Linux系统上有较好的支持
Celery工作流程图:
在传统的web应用中,Django的web页面通过url的映射到view,view再执行方法,如果方法需要调用大量的脚本,执行大量的任务,页面就会阻塞,如果在项目中使用Celery队列.首先用户的任务会被celery放到broker中进行中转,然后将任务分为一个个的task来执行,由于celery是异步机制,所以会直接给用户返回task_id,页面拿到task_id就可以执行后续的操作,比如查看任务进度,暂停任务,而无需等待所有任务全部执行完毕,才能看到页面
Celery的安装与使用
1.安装:
1.在linux(ubuntu)系统上首先安装Celery队列
pip3 install Celery
2.在linux安装redis
sudo apt-get install redis-server
3.在linux上安装redis-celery中间件
pip3 install -U "celery[redis]"
4.启动redis
sudo /etc/init.d/redis-server start
2.创建并执行一个简单的task
命名为tasks.py
1 from celery import Celery 2 3 app = Celery(‘tasks‘, 4 broker=‘redis://localhost‘, 5 backend=‘redis://localhost‘) 6 7 @app.task 8 def add(x,y): 9 print("running...",x,y) 10 return x+y
启动监听并开始执行该服务
celery -A tasks worker -l debug
在开启一个终端进行测试任务
进入python环境
1 from tasks import add 2 t = add.delay(3,3) #此时worker会生成一个任务和任务id 3 t.get() #获取任务执行的结果 4 t.get(propagate=False) #如果任务执行中出现异常,在client端不会异常退出 5 t.ready()#查看任务是否执行完毕 6 t.traceback #打印异常详细信息
3.在项目中创建celery
在当前的目录下创建文件夹celery_pro
mkdir celery_pro
在此目录下创建两个文件
目录结构:
1 celery_proj 2 /__init__.py 3 /celery.py 4 /tasks.py
celery.py(定义了celery的一些元信息)
1 rom __future__ import absolute_import, unicode_literals 2 from celery import Celery 3 4 app = Celery(‘proj‘, 5 broker=‘redis://localhost‘, #消息中间接收 6 backend=‘redis://localhost‘, #消息结果存放 7 include=[‘proj.tasks‘]) #执行任务的文件 8 9 # Optional configuration, see the application user guide. 10 app.conf.update( 11 result_expires=3600, 12 ) 13 14 if __name__ == ‘__main__‘: 15 app.start()
tasks.py (定义任务执行的具体逻辑和调用的具体方法)
1 from __future__ import absolute_import, unicode_literals 2 from .celery import app 3 4 5 @app.task 6 def add(x, y): 7 return x + y 8 9 10 @app.task 11 def mul(x, y): 12 return x * y 13 14 15 @app.task 16 def xsum(numbers): 17 return sum(numbers)
启动worker
celery -A celery_pro worker -l debug
再另一个窗口打开python命令模式进行测试
1 from celery_pro import tasks 2 3 t = tasks.add.delay(3,4) 4 t.get()
Celery的分布式:多启动worker就可以自动实现负载均衡,无需手动管理
Celery永驻后台(开启&重启&关闭)
1 celery multi start w1 -A celery_pro -l info #开启后台celery任务 2 celery multi restart w1 -A proj -l info #重启该服务 3 celery multi stop w1 -A proj -l info #关闭该服务
Celery定时任务
在celery_pro文件夹下创建periodic_tasks.py
目录结构:
1 celery_proj 2 /__init__.py 3 /celery.py 4 /tasks.py 5 /periodic_tasks.py
文件内容如下:
1 from __future__ import absolute_import, unicode_literals 2 from .celery import app 3 from celery.schedules import crontab 4 5 6 @app.on_after_configure.connect 7 def setup_periodic_tasks(sender, **kwargs): 8 # Calls test(‘hello‘) every 10 seconds. 9 sender.add_periodic_task(10.0, test.s(‘hello‘), name=‘add every 10‘) 10 11 # Calls test(‘world‘) every 30 seconds 12 sender.add_periodic_task(30.0, test.s(‘world‘), expires=10) 13 14 # Executes every Monday morning at 7:30 a.m. 15 sender.add_periodic_task( 16 crontab(hour=21, minute=42, day_of_week=5), 17 test.s(‘Happy Mondays!‘), 18 ) 19 20 @app.task 21 def test(arg): 22 print(arg)
修改celery.py,加入periodic_task.py
1 from __future__ import absolute_import, unicode_literals 2 from celery import Celery 3 4 app = Celery(‘proj‘, 5 broker=‘redis://localhost‘, 6 backend=‘redis://localhost‘, 7 include=[‘celery_pro.tasks‘,‘celery_pro.periodic_tasks‘]) 8 9 # Optional configuration, see the application user guide. 10 app.conf.update( 11 result_expires=3600, 12 ) 13 14 if __name__ == ‘__main__‘: 15 app.start() 16 ~ 17 ~
在服务端启动 celery -A celery_pro worker -l debug
在客户端启动 celery -A celery_pro.periodic_tasks beat -l debug
在服务端如果看到打印的hell ,world说明定时任务配置成功
上面是通过调用函数添加定时任务,也可以像写配置文件 一样的形式添加, 下面是每30s执行的任务
在celery.py中添加
1 app.conf.beat_schedule = { 2 ‘add-every-30-seconds‘: { 3 ‘task‘: ‘cerely_pro.tasks.add‘, #执行的具体方法 4 ‘schedule‘: 5.5, #每秒钟执行 5 ‘args‘: (16, 16) #执行的具体动作的参数 6 }, 7 } 8 app.conf.timezone = ‘UTC‘
更多定制
上面的定时任务比较简单,但如果你想要每周一三五的早上8点给你发邮件怎么办呢?用crontab功能,跟linux自带的crontab功能是一样的,可以个性化定制任务执行时间
1 rom celery.schedules import crontab 2 3 app.conf.beat_schedule = { 4 #在每周一早上7:30执行 5 ‘add-every-monday-morning‘: { 6 ‘task‘: ‘celery_pro.tasks.add‘, 7 ‘schedule‘: crontab(hour=7, minute=30, day_of_week=1), 8 ‘args‘: (16, 16), 9 },
还有更多定时配置方式如下:
Example | Meaning |
crontab() |
Execute every minute. |
crontab(minute=0, hour=0) |
Execute daily at midnight. |
crontab(minute=0, hour=‘*/3‘) |
Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm. |
|
Same as previous. |
crontab(minute=‘*/15‘) |
Execute every 15 minutes. |
crontab(day_of_week=‘sunday‘) |
Execute every minute (!) at Sundays. |
|
Same as previous. |
|
Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays. |
crontab(minute=0,hour=‘*/2,*/3‘) |
Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm |
crontab(minute=0, hour=‘*/5‘) |
Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5). |
crontab(minute=0, hour=‘*/3,8-17‘) |
Execute every hour divisible by 3, and every hour during office hours (8am-5pm). |
crontab(0, 0,day_of_month=‘2‘) |
Execute on the second day of every month. |
|
Execute on every even numbered day. |
|
Execute on the first and third weeks of the month. |
|
Execute on the eleventh of May every year. |
|
Execute on the first month of every quarter. |
Celery+Django实现异步任务分发
1.在setting.py的文件同一级别创建celery.py
1 from __future__ import absolute_import, unicode_literals 2 import os 3 from celery import Celery 4 5 # 设置Django的环境变量 6 os.environ.setdefault(‘DJANGO_SETTINGS_MODULE‘, ‘PerfectCRM.settings‘) 7 8 #设置app的默认处理方式,如果不设置默认是rabbitMQ 9 app = Celery(‘proj‘, 10 broker=‘redis://localhost‘, 11 backend=‘redis://localhost‘ 12 ) 13 14 #配置前缀 15 app.config_from_object(‘django.conf:settings‘, namespace=‘CELERY‘) 16 17 #自动扫描app下的tasks文件 18 app.autodiscover_tasks() 19 20 21 @app.task(bind=True) 22 def debug_task(self): 23 print(‘Request: {0!r}‘.format(self.request)) 24
2.修改当前目录下的__init__文件
1 from __future__ import absolute_import, unicode_literals 2 3 #启动时检测celery文件 4 from .celery import app as celery_app 5 6 __all__ = [‘celery_app‘]
3.在app下新增tasks文件,写要执行的任务
1 from __future__ import absolute_import, unicode_literals 2 from celery import shared_task 3 4 5 @shared_task 6 def add(x, y): 7 return x + y 8 9 10 @shared_task 11 def mul(x, y): 12 return x * y 13 14 15 @shared_task 16 def xsum(numbers): 17 return sum(numbers) 18
在另一个app下新增tasks文件
1 from __future__ import absolute_import, unicode_literals 2 from celery import shared_task 3 import time,random 4 5 @shared_task 6 def randnum(start, end): 7 time.sleep(3) 8 return random.ranint(start,end)
在app下的urls.py文件中增加映射
1 url(r‘celery_call‘, views.celery_call), 2 url(r‘celery_result‘, views.celery_result),
在views下增加处理逻辑
1 from crm import tasks 2 from celery.result import AsyncResult 3 import random 4 #计算结果 5 def celery_call(request): 6 randnum =random.randint(0,1000) 7 t = tasks.add.delay(randnum,6) 8 print(‘randum‘,randnum) 9 return HttpResponse(t.id) 10 11 #获取结果 12 def celery_result(request): 13 task_id = request.GET.get(‘id‘) 14 res = AsyncResult(id=task_id) 15 if res.ready(): 16 return HttpResponse(res.get()) 17 else: 18 return HttpResponse(res.ready())
测试
首先启动Django,从web端输入url调用celery_call方法
例:http://192.168.17.133:9000/crm/celery_call,此方法会返回一个task_id(41177118-3647-4830-b8c8-7be76d9819d7)
带着这个task_id 访问http://192.168.17.133:9000/crm/celery_result?id=41177118-3647-4830-b8c8-7be76d9819d7如果可以看到结果说明配置成功
Dnango+Celery实现定时任务
1.安装Django,Celery中间件
pip3 install django-celery-beat
2.在Django的settings文件中,新增app,名称如下
INSTALLED_APPS = (
.....,
‘django_celery_beat‘, #新增的app
)
3.输入命令
python manage.py migrate #创建与Django有关定时计划任务的新表
4.通过celery beat开启定时任务
celery -A PrefectCRM beat -l info -S django
5.启动Django服务,进入admin配置页面
python3 manager.py runserver 0.0.0.0:9000
并设置settings.py中的
ALLOW_HOSTS=[‘*‘]
6.可以在原有业务表的基础之上看到新的三张表
最后配置计划任务表,在此表中将定时任务和执行的频率相关联
后记:经测试,每添加或修改一个任务,celery beat都需要重启一次,要不然新的配置不会被celery beat进程读到
‘django_celery_beat
以上是关于Python Celery队列的主要内容,如果未能解决你的问题,请参考以下文章