Python celery -- 2019-08-08 20:39:56
Posted gqy02
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python celery -- 2019-08-08 20:39:56相关的知识,希望对你有一定的参考价值。
目录
原文: http://106.13.73.98/__/156/
安装:pip install celery
celery 是基于 Python 实现的模块,用于执行异步定时周期任务。
celery 组成结构:
- 用户任务 app: 用于生成任务
- 管道 broker 与 backend:前者用于存放任务,后者用于存放任务执行结果
- 员工 worker:负责执行任务
@(Python celery)
简单示例
员工文件(workers.py):
import time from celery import Celery # 创建一个Celery实例,这个就是我们用户的应用app my_task = Celery( 'tasks', broker='redis://127.0.0.1:6380', # 指定存放任务的地方,这个指定为redis backend='redis://127.0.0.1:6380', # 指定存放任务执行结果的地方 ) # 为应用创建任务 @my_task.task def fn1(x, y): time.sleep(10) return x + y """ 执行命令: Linux:celery worker -A workers -l INFO Windows:celery worker -A workers -l INFO -P eventlet celery 4.0 已经不再对Windows操作系统提供支持了,需要安装:pip install eventlet """
用户app文件(user_app.py):
from workers import fn1 # 提交任务,将任务存到管道,等待员工执行 result = fn1.delay(2, 4) # (i, i) 是传入的参数 print(result) # b2df92e9-0eee-4af5-be83-dd8ac044d2a4 # 运行后,将提交任务
用于取结果的文件(get_result.py):
from celery.result import AsyncResult from workers import my_task # user_app文件的打印结果 ID = 'b2df92e9-0eee-4af5-be83-dd8ac044d2a4' # 异步获取任务返回值 async_task = AsyncResult(id=ID, app=my_task) # 判断异步任务是否执行成功 if async_task.successful(): # 获取异步任务的返回值 result = async_task.get() print(result) else: print('任务还在执行中')
执行顺序:执行命令 > 提交任务 > 取结果
Celery 项目目录结构
在实际的项目中,我们的 celery 是有规则的:
要满足这样的条件才可以,目录 celery_task 和其它文件可以随意命名,但此目录内一定要有一个 celery.py 文件。
celery.py
from celery import Celery celery_task = Celery( 'task', broker='redis://127.0.0.1:6380', # 指定存放任务的地方,这个指定为redis backend='redis://127.0.0.1:6380', # 指定存放任务执行结果的地方 include=['celery_task.task01', 'celery_task.task02'] # 指定任务文件,会自动寻找任务文件中所有的任务 ) # 启动worker时,无需指定文件,直接通过你的celery_task目录就可以了 # 启动命令:celery worker -A celery_task -l INFO -P eventlet # 这样,celery就可以自动检索当前目录下的所有task了,通过Include参数去逐一寻找
task01.py
import time from .celery import celery_task @celery_task.task def one(x, y): time.sleep(5) return f'one:x + y'
task02.py
import time from .celery import celery_task @celery_task.task def two(x, y): time.sleep(10) return f'two: x + y'
user_app.py
from celery_task.task01 import one from celery_task.task02 import two one.delay(1, 1) two.delay(2, 2)
get_result.py
from celery.result import AsyncResult from celery_task.celery import celery_task # 终端显示的任务ID ID = 'b2df92e9-0eee-4af5-be83-dd8ac044d2a4' # 异步获取任务返回值 async_task = AsyncResult(id=ID, app=celery_task) # 判断异步任务是否执行成功 if async_task.successful(): # 获取异步任务的返回值 result = async_task.get() print(result) else: print('任务还在执行中')
定时任务
请结合 Celery 项目目录结构 中的文件来设置定时任务。
user_app.py 文件内容如下:import time import datetime from celery_task.task01 import one from celery_task.task02 import two # 获取当前时间,此时间为东八区时间 ctime = time.time() # 将当前的东八区时间改为 UTC时间,注意这里一定是UTC时间,没有其它说法 utc_time = datetime.datetime.utcfromtimestamp(ctime) # 时间格式例如:2019-02-20 11:30:02.032230 # 为当前时间增加10秒 add_time = datetime.timedelta(seconds=10) # 0:00:10 action_time = utc_time + add_time # 此时的 action_time 就是当前时间的未来10秒 # 现在,我们使用 apply_async 来提交定时任务 result = one.apply_async(args=(1, 1), eta=action_time) # args=(1, 1):提交的参数 # eta=action_time:指定执行时间
周期任务
请结合 Celery 项目目录结构 中的文件来设置周期任务。
celery.py 文件内容如下:from celery import Celery from celery.schedules import crontab celery_task = Celery( 'task', broker='redis://127.0.0.1:6380', # 指定存放任务的地方,这个指定为redis backend='redis://127.0.0.1:6380', # 指定存放任务执行结果的地方 include=['celery_task.task01', 'celery_task.task02'] # 指定任务文件,会自动寻找任务文件中所有的任务 ) # 定制周期任务 celery_task.conf.beat_schedule = # 每10秒执行一次celery_task.task01.one 'each10s_task': 'task': 'celery_task.task01.one', # 指定任务 'schedule': 10, # 每10秒执行一次 'args': (1, 1) # 参数 , # 每分钟执行一次celery_task.task01.one 'each1m_task': 'task': 'celery_task.task01.one', # 指定任务 'schedule': crontab(minute=1), # 每分钟执行一次 'args': (1, 1) # 参数 , # 每隔23小时执行一次celery_task.task02.two 'each24hour_task': 'task': 'celery_task.task02.two', # 指定任务 'schedule': crontab(hour=23), # 每23小时执行一次 'args': (1, 1) # 参数 """ 启动命令: 先执行:celery beat -A celery_task -l INFO # 生产者,周期任务需要一个生产者来周期性提交任务 再执行:celery worker -A celery_task -l INFO -P eventlet """
提交周期任务时,需要一个生产者 beat 来提交任务。
因此,启动命令分为两个:
celery beat -A celery_task -l INFO # 生产者,用于提交任务
celery worker -A celery_task -l INFO -P eventlet # 处理任务
原文: http://106.13.73.98/__/156/
以上是关于Python celery -- 2019-08-08 20:39:56的主要内容,如果未能解决你的问题,请参考以下文章
win运行celery的,解决ValueError: not enough values to unpack (expected 3, got 0) 2019-08-26