关于Celery的应用

Posted jimmyhe

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了关于Celery的应用相关的知识,希望对你有一定的参考价值。

  一、首先:以下代码都是在LINUX上执行的,因为新版本celery已经不支持windows系统,运行会报错。

  Celery是处理大量消息的一个分布式系统,那他是如何运行的呢?

 

  

  技术图片

 

可以看到,Celery 主要包含以下几个模块:

  

  • 任务模块 Task包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列
  • 消息中间件 BrokerBroker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。
  • 任务执行单元 WorkerWorker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它
  • 任务结果存储 BackendBackend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。

   celery还可以异步执行任务,定时任务,周期任务。

  下面创造一个执行单元:

  

task.py

from
celery import Celery import time my_task = Celery(task,broker=redis://127.0.0.1:6379, backend=redis://127.0.0.1:6379) # broker和backend都设置为redis # 任务执行单元 @my_task.task def func1(x,y): time.sleep(10) return x + y

  再建立一个派送任务的py

from task import func1

res = func1.delay(2,4)  # 用delay去向执行单元派送任务并传参

print(res.id)

  先运行worker,让他处于ready状态,如果有数据就可以直接处理数据:

celery worker -A task -l INFO

# -A 指定worker所在的Celery app的文件名,就是task

# -l   是日志打印的级别

# -c 10 可以同时开启10个worker处理 

  结果如下,说明worker已经待命,可以处理数据了。

[tasks]
  . task.func1

[2019-05-20 16:37:28,752: INFO/MainProcess] Connected to redis://127.0.0.1:6379//
[2019-05-20 16:37:28,768: INFO/MainProcess] mingle: searching for neighbors
[2019-05-20 16:37:29,797: INFO/MainProcess] mingle: all alone
[2019-05-20 16:37:29,841: INFO/MainProcess] [email protected] ready.

  同时我们运行我们的发送命令的py

python3 res.py

  运行结果如下:

[2019-05-20 16:39:07,699: INFO/MainProcess] Connected to redis://127.0.0.1:6379//
[2019-05-20 16:39:07,712: INFO/MainProcess] mingle: searching for neighbors
[2019-05-20 16:39:08,741: INFO/MainProcess] mingle: all alone
[2019-05-20 16:39:08,771: INFO/MainProcess] [email protected]machine ready.
[2019-05-20 16:40:07,600: INFO/MainProcess] Received task: task.func1[fb4ad8bc-488a-4aba-ba37-1b554b08346a]  
[2019-05-20 16:40:17,634: INFO/ForkPoolWorker-2] Task task.func1[fb4ad8bc-488a-4aba-ba37-1b554b08346a] succeeded in 10.023761188000208s: 6

同时给我们res.py打印的res是一个uuid:

e14727fc-15b3-47eb-8df5-4930d7cc337c

我们调用res.get()可以取到这个uuid实际的运行结果。

以上是最简单的一个celery应用,让大家明白celery运行的一个流程。

 

  PS: 我们除了调用delay外,还可以调用apply_async事实上,delay方法封装了 apply_async,如下:

def delay(self, *partial_args, **partial_kwargs):
    """Shortcut to :meth:`apply_async` using star arguments."""
    return self.apply_async(partial_args, partial_kwargs)

也就是说,delay 是使用 apply_async 的快捷方式。apply_async 支持更多的参数,它的一般形式如下:

countdown:指定多少秒后执行任务

task1.apply_async(args=(2, 3), countdown=5)    # 5 秒后执行任务

eta (estimated time of arrival):指定任务被调度的具体时间,参数类型是 datetime

from datetime import datetime, timedelta
# 当前 UTC 时间再加 10 秒后执行任务
task1.multiply.apply_async(args=[3, 7], eta=datetime.utcnow() + timedelta(seconds=10))

xpires:任务过期时间,参数类型可以是 int,也可以是 datetime

task1.multiply.apply_async(args=[3, 7], expires=10)    # 10 秒后过期

 

 

二、在实际项目中的应用

技术图片

 

 celery.py,PS:这个名字是为了不用文件的方式启动,直接启动Celery_task目录名就可以,celery会自动去检索目录下所有的task,通过过include中的内容逐一去找

1 from celery import Celery
2 
3 celery_task = Celery("task",
4                      broker="redis://127.0.0.1:6379",
5                      backend="redis://127.0.0.1:6379",
6                      include=["Celery_task.task_one","Celery_task.task_two"])
7 # include 这个参数适用于寻找目录中所有的task

 task_one.py

1 from .celery import celery_task
2 import time
3 
4 @celery_task.task
5 def one(x,y):
6     time.sleep(5)
7     return f"task_one {x+y}"

  task_two.py

1 from .celery import celery_task
2 import time
3 
4 @celery_task.task
5 def two(x,y):
6     time.sleep(5)
7     return f"task_two {x+y}"

  my_celery:

1 from Celery_task.task_one import one
2 from Celery_task.task_two import two
3 
4 one.delay(10,10)
5 two.delay(20,20)

  运行:

celery worker -A Celery_task -l INFO

  celery也可以通过此方式帮助我们达到一个异步处理数据的一个过程。不用阻塞等待结果在执行下一步。

 

  三、定时任务(关键字:delay -- > apply_async)

  我们还使用Celery_task这个示例来修改一下,my_celery中进行一下小修改

 1 from Celery_task.task_one import one
 2 from Celery_task.task_two import two
 3 
 4 # one.delay(10,10)
 5 # two.delay(20,20)
 6 
 7 # 定时任务我们不在使用delay这个方法了,delay是立即交给task 去执行
 8 # 现在我们使用apply_async定时执行
 9 
10 #首先我们要先给task一个执行任务的时间
11 import datetime,time
12 # 获取当前时间 此时间为东八区时间
13 ctime = time.time()
14 # 将当前的东八区时间改为 UTC时间 注意这里一定是UTC时间,没有其他说法
15 utc_time = datetime.datetime.utcfromtimestamp(ctime)
16 # 为当前时间增加 10 秒
17 add_time = datetime.timedelta(seconds=10)
18 action_time = utc_time + add_time
19 
20 # action_time 就是当前时间未来10秒之后的时间
21 #现在我们使用apply_async定时执行
22 res = one.apply_async(args=(10,10),eta=action_time)
23 print(res.id)
24 #这样原本延迟5秒执行的One函数现在就要在10秒钟以后执行了

 

  四、周期任务

 1 from celery import Celery
 2 from celery.schedules import crontab
 3 
 4 celery_task = Celery("task",
 5                      broker="redis://127.0.0.1:6379",
 6                      backend="redis://127.0.0.1:6379",
 7                      include=["Celery_task.task_one","Celery_task.task_two"])
 8 
 9 #我要要对beat任务生产做一个配置,这个配置的意思就是每10秒执行一次Celery_task.task_one任务参数是(10,10)
10 celery_task.conf.beat_schedule={
11     "each10s_task":{
12         "task":"Celery_task.task_one.one",
13         "schedule":10, # 每10秒钟执行一次
14         "args":(10,10)
15     },
16     "each1m_task": {
17         "task": "Celery_task.task_one.one",
18         "schedule": crontab(minute=1), # 每一分钟执行一次
19         "args": (10, 10)
20     },
21     "each24hours_task": {
22         "task": "Celery_task.task_one.one",
23         "schedule": crontab(hour=24), # 每24小时执行一次
24         "args": (10, 10)
25     }
26 
27 }
28 
29 #以上配置完成之后,还有一点非常重要
30 # 不能直接创建Worker了,因为我们要执行周期任务,所以首先要先有一个任务的生产方
31 # celery beat -A Celery_task
32 # celery worker -A Celery_task -l INFO -P eventlet

 

 

  

 

以上是关于关于Celery的应用的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Django 应用程序中使用 celery 执行任务?

数字海洋应用平台上的 Django Celery 与 Redis 问题

分布式任务队列:Celery

关于使用带有远程存储的 sorl-thumbnails 的 celery 的指针?

如何在片段中使用 GetJsonFromUrlTask​​.java

关于代码片段的时间复杂度