Python - Celery

Posted 坨之歌

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python - Celery相关的知识,希望对你有一定的参考价值。

Celery - 概念

简单的灵活可靠的处理大量消息的分布式系统

专注于实时处理的异步任务队列, 同时也支持任务调度

结构图

使用场景

异步任务  将耗时的操作任务提交给 Celery 去异步执行 - 比如发送短信 / 邮件, 消息推送, 音视频处理等

定时任务  类似于 crontab, 比如每日的数据统计

消息中间件

可选  [ RabbitMQ / Redis ] 

Redis 的安装 跳转这里 

Celery - 使用

安装

pip install celery[redis]

基本使用 - 异步任务

基础版本

先创建一个简单的耗时阻塞的任务, 很显然会在中间卡顿 4s 等待

# -*- coding: utf-8 -*-
import time


def add(x, y):
    print "in tasks ---"
    time.sleep(4)
    return x + y


if __name__ == \'__main__\':
    print \'start ---\'
    ret = add(2, 8)
    print "end ---"
    print ret
start ---
in tasks ---
end ---
10

加入 celery 的优化版本

 将原始的 的 add 方法重新优化一下变成 Celery 的版本

tasks.py

# -*- coding: utf-8 -*-
import time
from celery import Celery

broker = \'redis://localhost:6379/1\'
backend = \'redis://localhost:6379/2\'

app = Celery(\'my_task\', broker=broker, backend=backend)


@app.task
def add(x, y):
    print "in tasks ---"
    time.sleep(4)
    return x + y

 app.py

# -*- coding: utf-8 -*-

from tasks import add

if __name__ == \'__main__\':
    print \'start ---\'
    ret = add.delay(2, 8)
    print "end ---"
    print ret

 再运行的时候便没有卡顿, 而是以一个任务标识的形式进行返回

start ---
end ---
a3e85efa-859f-45de-af27-e582ff54ddef

此时任务将被丢入 redis 中等待调度, 需要打开 celery 的 worker 进行处理

Celery - worker 

在次目录下进行

celery worker  -A tasks -l INFO

-l 表示日志打印等级

-A 指定执行的函数文件

此时若因版本过高会触发系列报错, 详情 跳转这里 

成功后会打印配置信息, 比如此处设置的 redis 以及 被执行的 add 都可以显示出

简单测试 

打开 worker 之后就可以进行执行调用了

当前目录再次打开一个解释器进行简单操作

 在 worker 这边的日志就可以打印出来了

 

 相关的执行结果和执行状态也可以通过一些方法进行获取到

流程梳理

app.py 中进行 add 的函数调用,调用直接返回任务标识

真正的任务在 worker 中异步执行, 然后再 app.py 中的后续代码将不收到阻塞的影响

通过  .ready .get  方法可以获取执行状态以及相关的结果

Celery 目录结构

较为标准的目录格式是需要代码和配置文件分离更加直观

 

 

 __init__.py 

初始化文件中进行初始化的模型创建以及配置文件的导入

# -*- coding: utf-8 -*-
from celery import Celery

app = Celery(\'demo\')

app.config_from_object(\'celery_app.celeryconfig\')  # 通过 celery 实例加载配置模块

 

celeryconfig.py

配置文件中进行配置文件的相关操作, 这里主要是配置 Redis 的相关配置以及简单的时区更改展示

Redis 依旧是存放任务的, 以及存放结果需要两个配置配置

# -*- coding: utf-8 -*-

BROKER_URL = \'redis://localhost:6379/1\'

CELERY_RESULT_BACKEND = \'redis://localhost:6379/2\'

CELERY_TIMEZONE = \'Asia/Shanghai\'  # 默认 UTC 时间

# 导入指定的任务模块
CELERY_IMPORTS = (
    \'celery_app.task1\',
    \'celery_app.task2\',
)

 

task1.py / task2.py

task 则用于实际逻辑的函数编写即可

# -*- coding: utf-8 -*-
import time
from celery_app import app


@app.task
def multiply(x, y):
    time.sleep(3)
    return x * y

 

# -*- coding: utf-8 -*-
import time
from celery_app import app


@app.task
def add(x, y):
    time.sleep(3)
    return x + y

 

app.py

app 用于外部的调用

# -*- coding: utf-8 -*-
from celery_app import task1, task2

if __name__ == \'__main__\':
    task1.add.apply_async(5, 10)
    task2.multiply.delay(5, 10)
    print \'end  -----\'

 

 基础使用 - 定时任务

以上是关于Python - Celery的主要内容,如果未能解决你的问题,请参考以下文章

Celery+python+redis异步执行定时任务

Python爬虫之使用celery加速爬虫

python 一些代码使用模式作为SQL-Alchemy的声明基础,以及对Celery分支的支持。

在 Python 中设置 celery 任务后端的麻烦

异步 celery 任务完成后自动调用 PHP 代码(celery-php)

Django-Python3-Celery 异步任务/定时任务