Celery任务调度
Posted 夏末蝉未鸣01
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Celery任务调度相关的知识,希望对你有一定的参考价值。
理论
Celery是基于Python开发的分布式任务调度工具,简单、灵活可靠,专注于实时任务处理。
Celery的架构由三部分组成:消息中间件、任务执行单元和任务结果存储。
- 消息中间件(broker):Celery将任务放在消息中间件里,一般为Redis、RabbitMQ等;
- 任务执行单元(worker):Celery提供的执行任务的单元,实时从消息中间件里获取任务并执行;
- 任务结果存储(backend):一般为Redis、AMQP等,用于存储任务执行后得到的结果。
实战
安装模块
pip install celery # 安装celery
pip install eventlet # 安装eventlet
编写代码
task.py
import time
from celery import Celery
app = Celery(
'task',
broker='redis://:xxxxx@127.0.0.1:6379/1', # 指定中间件,xxxxx为redis密码
backend='redis://:xxxxx@127.0.0.1:6379/1', # 指定任务结果存储
include=['task'] # 含有任务的文件
)
@app.task
def test(x, y): # celery任务
time.sleep(2)
return x + y
启动worker
celery -A task worker -P eventlet -l info
C:\\Users\\Administrator\\Desktop\\task>celery -A task worker -P eventlet -l info
-------------- celery@172_17_0_14 v4.4.4 (cliffs)
--- ***** -----
-- ******* ---- Windows-2012ServerR2-6.3.9600-SP0 2023-02-13 12:44:50
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: task:0xd2ad91fd0
- ** ---------- .> transport: redis://:**@127.0.0.1:6379/1
- ** ---------- .> results: redis://:**@127.0.0.1:6379/1
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. task.test
[2023-02-13 12:44:51,036: INFO/MainProcess] Connected to redis://:**@127.0.0.1:6379/1
[2023-02-13 12:44:51,048: INFO/MainProcess] mingle: searching for neighbors
[2023-02-13 12:44:51,260: INFO/SpawnPoolWorker-1] child process 632 calling self.run()
[2023-02-13 12:44:51,327: INFO/SpawnPoolWorker-2] child process 68 calling self.run()
[2023-02-13 12:44:52,111: INFO/MainProcess] mingle: all alone
[2023-02-13 12:44:52,120: INFO/MainProcess] celery@172_17_0_14 ready.
将任务放入broker
from task import test
result = test.delay(3, 2)
print(result.id)
‘’‘
运行结果
b523e30a-4f1b-4195-938f-ee85be33550f
’‘’
[2023-02-13 12:54:53,761: INFO/MainProcess] pidbox: Connected to redis://:**@127.0.0.1:6379/1.
[2023-02-13 12:55:24,213: INFO/MainProcess] Received task: task.test[b523e30a-4f1b-4195-938f-ee85be33550f]
[2023-02-13 12:55:24,230: INFO/MainProcess] Task task.test[b523e30a-4f1b-4195-938f-ee85be33550f] succeeded in 0.015000000013969839s: 5
从backend获取任务结果
from task import app
from celery.result import AsyncResult
result_project = AsyncResult(id='b523e30a-4f1b-4195-938f-ee85be33550f', app=app)
print(result_project.get())
‘’‘
运行结果
5
’‘’
以上均为个人见解,更详细的使用方法请参考:celery中文文档
以上是关于Celery任务调度的主要内容,如果未能解决你的问题,请参考以下文章