Python celery -- 2019-08-08 20:39:56

Posted gqy02

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python celery -- 2019-08-08 20:39:56相关的知识,希望对你有一定的参考价值。

目录

原文: http://106.13.73.98/__/156/

安装:pip install celery

celery 是基于 Python 实现的模块,用于执行异步定时周期任务。

celery 组成结构:

  1. 用户任务 app: 用于生成任务
  2. 管道 broker 与 backend:前者用于存放任务,后者用于存放任务执行结果
  3. 员工 worker:负责执行任务

@(Python celery)


简单示例


员工文件(workers.py):

import time
from celery import Celery


# 创建一个Celery实例,这个就是我们用户的应用app
my_task = Celery(
    'tasks',
    broker='redis://127.0.0.1:6380',  # 指定存放任务的地方,这个指定为redis
    backend='redis://127.0.0.1:6380',  # 指定存放任务执行结果的地方
)

# 为应用创建任务
@my_task.task
def fn1(x, y):
    time.sleep(10)
    return x + y



"""
执行命令:
Linux:celery worker -A workers -l INFO
Windows:celery worker -A workers -l INFO -P eventlet

celery 4.0 已经不再对Windows操作系统提供支持了,需要安装:pip install eventlet
"""


用户app文件(user_app.py):

from workers import fn1


# 提交任务,将任务存到管道,等待员工执行
result = fn1.delay(2, 4)  # (i, i) 是传入的参数

print(result)  # b2df92e9-0eee-4af5-be83-dd8ac044d2a4


# 运行后,将提交任务


用于取结果的文件(get_result.py):

from celery.result import AsyncResult
from workers import my_task


# user_app文件的打印结果
ID = 'b2df92e9-0eee-4af5-be83-dd8ac044d2a4'

# 异步获取任务返回值
async_task = AsyncResult(id=ID, app=my_task)

# 判断异步任务是否执行成功
if async_task.successful():
    # 获取异步任务的返回值
    result = async_task.get()
    print(result)
else:
    print('任务还在执行中')


执行顺序:执行命令 > 提交任务 > 取结果


Celery 项目目录结构


在实际的项目中,我们的 celery 是有规则的:
技术图片
要满足这样的条件才可以,目录 celery_task 和其它文件可以随意命名,但此目录内一定要有一个 celery.py 文件。

celery.py

from celery import Celery

celery_task = Celery(
    'task',
    broker='redis://127.0.0.1:6380',  # 指定存放任务的地方,这个指定为redis
    backend='redis://127.0.0.1:6380',  # 指定存放任务执行结果的地方
    include=['celery_task.task01', 'celery_task.task02']  # 指定任务文件,会自动寻找任务文件中所有的任务
)


# 启动worker时,无需指定文件,直接通过你的celery_task目录就可以了
# 启动命令:celery worker -A celery_task -l INFO -P eventlet
# 这样,celery就可以自动检索当前目录下的所有task了,通过Include参数去逐一寻找

task01.py

import time
from .celery import celery_task

@celery_task.task
def one(x, y):
    time.sleep(5)
    return f'one:x + y'

task02.py

import time
from .celery import celery_task

@celery_task.task
def two(x, y):
    time.sleep(10)
    return f'two: x + y'

user_app.py

from celery_task.task01 import one
from celery_task.task02 import two

one.delay(1, 1)
two.delay(2, 2)

get_result.py

from celery.result import AsyncResult
from celery_task.celery import celery_task


# 终端显示的任务ID
ID = 'b2df92e9-0eee-4af5-be83-dd8ac044d2a4'

# 异步获取任务返回值
async_task = AsyncResult(id=ID, app=celery_task)

# 判断异步任务是否执行成功
if async_task.successful():
    # 获取异步任务的返回值
    result = async_task.get()
    print(result)
else:
    print('任务还在执行中')


定时任务


请结合 Celery 项目目录结构 中的文件来设置定时任务。

user_app.py 文件内容如下:

import time
import datetime
from celery_task.task01 import one
from celery_task.task02 import two


# 获取当前时间,此时间为东八区时间
ctime = time.time()

# 将当前的东八区时间改为 UTC时间,注意这里一定是UTC时间,没有其它说法
utc_time = datetime.datetime.utcfromtimestamp(ctime)  # 时间格式例如:2019-02-20 11:30:02.032230

# 为当前时间增加10秒
add_time = datetime.timedelta(seconds=10)  # 0:00:10
action_time = utc_time + add_time
# 此时的 action_time 就是当前时间的未来10秒


# 现在,我们使用 apply_async 来提交定时任务
result = one.apply_async(args=(1, 1), eta=action_time)
# args=(1, 1):提交的参数
# eta=action_time:指定执行时间


周期任务


请结合 Celery 项目目录结构 中的文件来设置周期任务。

celery.py 文件内容如下:

from celery import Celery
from celery.schedules import crontab

celery_task = Celery(
    'task',
    broker='redis://127.0.0.1:6380',  # 指定存放任务的地方,这个指定为redis
    backend='redis://127.0.0.1:6380',  # 指定存放任务执行结果的地方
    include=['celery_task.task01', 'celery_task.task02']  # 指定任务文件,会自动寻找任务文件中所有的任务
)


# 定制周期任务
celery_task.conf.beat_schedule = 
    # 每10秒执行一次celery_task.task01.one
    'each10s_task': 
        'task': 'celery_task.task01.one',  # 指定任务
        'schedule': 10,  # 每10秒执行一次
        'args': (1, 1)  # 参数
    ,
    # 每分钟执行一次celery_task.task01.one
    'each1m_task': 
        'task': 'celery_task.task01.one',  # 指定任务
        'schedule': crontab(minute=1),  # 每分钟执行一次
        'args': (1, 1)  # 参数
    ,
    # 每隔23小时执行一次celery_task.task02.two
    'each24hour_task': 
        'task': 'celery_task.task02.two',  # 指定任务
        'schedule': crontab(hour=23),  # 每23小时执行一次
        'args': (1, 1)  # 参数
    



"""
启动命令:
先执行:celery beat -A celery_task -l INFO  # 生产者,周期任务需要一个生产者来周期性提交任务
再执行:celery worker -A celery_task -l INFO -P eventlet 
"""

提交周期任务时,需要一个生产者 beat 来提交任务。
因此,启动命令分为两个:

celery beat -A celery_task -l INFO # 生产者,用于提交任务
celery worker -A celery_task -l INFO -P eventlet # 处理任务

原文: http://106.13.73.98/__/156/

以上是关于Python celery -- 2019-08-08 20:39:56的主要内容,如果未能解决你的问题,请参考以下文章

win运行celery的,解决ValueError: not enough values to unpack (expected 3, got 0) 2019-08-26

使用 Django / virtualenv 运行 celery 时找不到“__main__”模块

从 Celery 任务向 Channels 发送消息

如何获取 celery broker 和后端的状态?

python任务调度模块celery

python之celery在flask中使用