Celery任务调度

Posted 夏末蝉未鸣01

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Celery任务调度相关的知识,希望对你有一定的参考价值。

理论

Celery是基于Python开发的分布式任务调度工具,简单、灵活可靠,专注于实时任务处理。

Celery的架构由三部分组成:消息中间件、任务执行单元和任务结果存储。

  • 消息中间件(broker):Celery将任务放在消息中间件里,一般为Redis、RabbitMQ等;
  • 任务执行单元(worker):Celery提供的执行任务的单元,实时从消息中间件里获取任务并执行;
  • 任务结果存储(backend):一般为Redis、AMQP等,用于存储任务执行后得到的结果。

实战

安装模块

pip install celery  # 安装celery
pip install eventlet  # 安装eventlet

编写代码

task.py

import time
from celery import Celery


app = Celery(
    'task',
    broker='redis://:xxxxx@127.0.0.1:6379/1',  # 指定中间件,xxxxx为redis密码
    backend='redis://:xxxxx@127.0.0.1:6379/1',  # 指定任务结果存储
    include=['task']  # 含有任务的文件
)

@app.task
def test(x, y):  # celery任务
    time.sleep(2)

    return x + y

 启动worker

celery -A task worker -P eventlet -l info
C:\\Users\\Administrator\\Desktop\\task>celery -A task worker -P eventlet -l info

 -------------- celery@172_17_0_14 v4.4.4 (cliffs)
--- ***** -----
-- ******* ---- Windows-2012ServerR2-6.3.9600-SP0 2023-02-13 12:44:50
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         task:0xd2ad91fd0
- ** ---------- .> transport:   redis://:**@127.0.0.1:6379/1
- ** ---------- .> results:     redis://:**@127.0.0.1:6379/1
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . task.test

[2023-02-13 12:44:51,036: INFO/MainProcess] Connected to redis://:**@127.0.0.1:6379/1
[2023-02-13 12:44:51,048: INFO/MainProcess] mingle: searching for neighbors
[2023-02-13 12:44:51,260: INFO/SpawnPoolWorker-1] child process 632 calling self.run()
[2023-02-13 12:44:51,327: INFO/SpawnPoolWorker-2] child process 68 calling self.run()
[2023-02-13 12:44:52,111: INFO/MainProcess] mingle: all alone
[2023-02-13 12:44:52,120: INFO/MainProcess] celery@172_17_0_14 ready.

 将任务放入broker 

from task import test


result = test.delay(3, 2)
print(result.id)

‘’‘
运行结果
b523e30a-4f1b-4195-938f-ee85be33550f
’‘’
[2023-02-13 12:54:53,761: INFO/MainProcess] pidbox: Connected to redis://:**@127.0.0.1:6379/1.
[2023-02-13 12:55:24,213: INFO/MainProcess] Received task: task.test[b523e30a-4f1b-4195-938f-ee85be33550f]
[2023-02-13 12:55:24,230: INFO/MainProcess] Task task.test[b523e30a-4f1b-4195-938f-ee85be33550f] succeeded in 0.015000000013969839s: 5

从backend获取任务结果

from task import app
from celery.result import AsyncResult

result_project = AsyncResult(id='b523e30a-4f1b-4195-938f-ee85be33550f', app=app)

print(result_project.get())

‘’‘
运行结果
5
’‘’

以上均为个人见解,更详细的使用方法请参考:celery中文文档 

以上是关于Celery任务调度的主要内容,如果未能解决你的问题,请参考以下文章

celery-分布式任务调度

celery-分布式任务调度

python-celery专注于实现分布式异步任务处理任务调度的插件!

Python 最强大的任务调度框架 Celery!

Celery异步的分布式任务调度理解

celery任务调度模块