浅谈 Celery 分布式队列

Posted jaho

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅谈 Celery 分布式队列相关的知识,希望对你有一定的参考价值。

 

Q1: Django开发Web项目时遇到一个问题,如何解决大量用户在同一时间注册,短信发送延迟的问题?

  A1:   ① 封装一个发送短信的函数

      ② 创建进程、线程、协程调用发送短信的函数

 

Q2: 创建的进程、线程、协程和Django网站服务器在同一个电脑上,并且调用顺序也是不确定的 所以A1 OUT

A2: Celery(异步任务队列):

① celery中的任务发出者,中间人和任务执行着可以在不同的电脑上

② celery 中的任务会进行排序,先添加的任务先被worker执行

 

1. Celery的介绍

  Celery是Python开发的分布式任务调度模块,通过它我们可以轻松地实现任务的异步处理,Celery主要有以下几个优点:    

    1. 它可以让任务的执行同主程序完全脱离,甚至不在同一台主机内。

    2. 它通过队列来调度任务,不用担心并发量高时系统负载过大。

    3. 它可以用来处理复杂系统性能问题,却又相当灵活易用。

  还是举用户注册的例子,比如同一时间有100个用户要注册,此时网络很差,请求到达短信系统的时间将会很长,如果短信系统迟迟无法回应,会导致后续的代码无法执行,造成用户长时间地等待,影响用户的体验;使用了Celery异步消息队列,只要发布者将发送短信的任务送至中间件,后续无需做任何事情,worker会监听任务队列并执行。

  在我的理解中 Celery主要有三大模块组成:①  任务发出者:发出执行的任务函数    任务执行者(worker):即执行任务的程序,可以有多个并发。它实时监控消息队列,获取队列中调度的任务,并执行它   ③ 中间人(broker):即任务调度队列,它是一个生产者消费者模式的任务队列,即任务发出者发出任务到任务队列中,任务执行者从任务队列中取出任务并执行,执行调度可以是顺序调度也可以是计划时间调度;Celery推荐的中间人(中间件)有RabbitMQ和Redis,本文使用的是Redis。

技术分享图片

2. Celery的使用

2.1 安装Celery

  pip install celery 

2.2 目录结构:

  技术分享图片

2.3 主要步骤:

1. 创建Celery对象并进行配置

 # -- main.py
1
from celery import Celery 2 3 # 创建celery应用 4 app = Celery(celery_name) 5 6 # 从conf.py中导入celery 配置 7 app.config_from_object(celery_tasks.conf) 8 9 # 自动注册celery任务 10 app.autodiscover_tasks([celery_tasks.sms])

 

2. 定义任务函数(文件名必须为 tasks.py)

# --tasks.py   
1
from celery_tasks.main import app 2 3 @app.task(name=test_task) 4 def task(x, y): # 参数可不传 5 print(%s,%s % (x, y))

 

3. 启动worker(celery  -A Celery对象包路径 worker -l info)

>>> :celery -A celery_tasks.main worker -l info #  (l:事件级别 info:打印信息)

OUT :

-------------- [email protected] v4.2.0 (windowlicker)
---- **** -----
--- * *** * -- Linux-4.4.0-31-generic-x86_64-with-Ubuntu-16.04-xenial 2018-07-04 19:09:40
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: celery_name:0x7f785faee518 # celery_name  应用名
- ** ---------- .> transport: redis://127.0.0.1:6379/14 # 中间人地址
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 1 (prefork) # 启动了几个worker 默认cpu几核启动几个
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks]
. test_task # 任务名


[2018-07-04 19:09:40,913: INFO/MainProcess] Connected to redis://127.0.0.1:6379/14
[2018-07-04 19:09:40,930: INFO/MainProcess] mingle: searching for neighbors
[2018-07-04 19:09:41,960: INFO/MainProcess] mingle: all alone
[2018-07-04 19:09:42,016: INFO/MainProcess] [email protected] ready.

 

4. 发送任务

>>> : task.delay(‘你好‘,‘Jaho‘)

OUT :

[2018-07-04 19:15:14,413: INFO/MainProcess] Received task: test_task[81c86c98-1550-4d60-ab19-2e148d975b3d]
[2018-07-04 19:15:14,414: WARNING/ForkPoolWorker-1] 你好,Jaho
[2018-07-04 19:15:14,415: INFO/ForkPoolWorker-1] Task test_task[81c86c98-1550-4d60-ab19-2e148d975b3d] succeeded in 0.00043077800000901334s: None

 

3. 最后 Celery还有强大的定时任务功能还有使用RabbitMQ充当中间件的情况 后续将会继续补充

 

    

    

  



























以上是关于浅谈 Celery 分布式队列的主要内容,如果未能解决你的问题,请参考以下文章

[源码分析] 分布式任务队列 Celery 之 发送Task & AMQP

Celery 分布式任务队列快速入门

Celery 分布式任务队列快速入门

Celery 分布式任务队列快速入门

Celery分布式队列学习

分布式任务队列Celery入门与进阶