celery task - 2

Posted wodeboke-y

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了celery task - 2相关的知识,希望对你有一定的参考价值。

# celery task

前言

讨论一个定时任务,一般而言,需要的功能如下:

  1. 封装成对象,独立执行;
  2. 对象有一些接口,便于了解它的状态;
  3. 定时调用;
  4. 行为控制,包括重试,成功/失败回调等;

下面分别介绍celery的这些功能实现。

1.task basic

celery的task基础类是tasks.Task()

1.1 bound tasks

绑定代表第一个参数默认是self

logger = get_task_logger(__name__)

@task(bind=True)
def add(self, x, y):
    logger.info(self.request.id)

1.2 Task类继承

需要注意的是声明的位置,是在把方法修饰成Task类时声明。

@app.task(base=MyTask)
def add(x, y):
    #raise KeyError
    return x + y

1.3 names

每个task实例都有一个非重复的名字,譬如下例:


@app.task(name=‘tasks.mul‘)
def mul(x, y):

一般不必要使用这一功能,特别是在task方法放在单独的module中时,默认name就是module name+方法名(celery_tasks.mul)。
尽量不要把任务模块命名为tasks.py,命名为celery_1.py更好一些。

1.4 其它属性

Task.max_retries 最大重试次数
Task.default_retry_delay 默认重试等待时间。
Task.ignore_result 抛弃结果,意味着不能通过 AsyncResult查看结果。

2.task 自定义任务行为

文档:http://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-task-classes

主要有四种,包括失败/成功/重试/完成
on_failure on_success on_retry after_return

# celery_tasks.py
class MyTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print ‘task done: {0}‘.format(retval)
        return super(MyTask, self).on_success(retval, task_id, args, kwargs)
    
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print ‘task fail, reason: {0}‘.format(exc)
        return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)

@app.task(base=MyTask)
def add(x, y):
    return x + y

2.1 retry

app.Task.retry() 是实现重试的方法。

# an example of using retry:
@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
    try:
        twitter = Twitter(oauth)
        twitter.update_status(tweet)
    except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
        raise self.retry(exc=exc)

可以指定重试间隔时间,默认为180秒,下面案例指定为1800秒。


@app.task(bind=True, default_retry_delay=30 * 60) # retry in 30 minutes.
def add(self, x, y):

关于最大重试次数等参数是在task实例中指定。

3. 定时执行 periodic task

文档:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html

celery beat is a scheduler。

默认的参数源自beat_schedule。

3.1 时区

调度器默认使用UTC时区,当然需要修改:
timezone = ‘Europe/London‘
或 app.conf.timezone = ‘Europe/London‘

3.2 entries

有两种添加定时任务的方式,装饰器和配置文件。
常用配置文件方式。

3.2.1 从配置文件中读取定时任务

celery_config.py # 配置文件

!/usr/bin/env python

coding:utf-8

"""
celery configure

"""

author = ‘sss‘

from future import absolute_import
from celery.schedules import crontab
from datetime import timedelta

使用redis存储任务队列及结果

broker_url = ‘redis://:123@192.168.199.113:6379/0‘
result_backend = ‘redis://:123@192.168.199.113:6379/1‘

task_serializer = ‘json‘
result_serializer = ‘json‘
accept_content = [‘json‘]

时区

timezone = ‘Asia/Shanghai‘

celery默认开启自己的日志

False表示不关闭

worker_hijack_root_logger = False

存储结果过期时间,过期后自动删除

单位为秒

result_expires = 60 * 60 * 24

导入任务所在文件

imports = [
‘celery_tasks‘,]

定时任务配置

beat_schedule = {
‘test1‘: {
# 具体需要执行的函数
# 该函数必须要使用@app.task装饰
‘task‘: ‘celery_tasks.test1_run‘,
# 定时时间
# 每分钟执行一次,不能为小数
‘schedule‘: crontab(minute=‘/1‘),
# 或者这么写,每小时执行一次
# "schedule": crontab(minute=0, hour="
/1")
# 执行的函数需要的参数
‘args‘: ()
},
‘test2‘: {
‘task‘: ‘celery_tasks.test2_run‘,
# 设置定时的时间,10秒一次
‘schedule‘: timedelta(seconds=10),
‘args‘: ()
}}

celery_workder.py # 主文件


from future import absolute_import

拒绝隐式引入,如果celery.py和celery模块名字一样,避免冲突,需要加上这条语句

该代码中,名字是不一样的,最好也要不一样

celery test -- worker

from celery import Celery

def create_worker():
# app = Celery(‘tasks‘, broker=di)
‘‘‘app = Celery(‘tasks‘,
#backend=di_backend,
broker=di_broker,
include=[‘celery_tasks‘])
‘‘‘
app = Celery()
#app.conf.update(result_expires=3600,)
app.config_from_object(‘celery_config‘)
return app

app = create_worker()

celery_tasks.py # task方法文件

from celery_worker import app
from celery.task import Task
import time

tasks.py

class MyTask(Task):
def on_success(self, retval, task_id, args, kwargs):
print(‘task done: {0}‘.format(retval))
return super(MyTask, self).on_success(retval, task_id, args, kwargs)

def on_failure(self, exc, task_id, args, kwargs, einfo):
    print('task fail, reason: {0}'.format(exc))
    return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)

@app.task(base=MyTask)
def add(x, y):
#raise KeyError
print(‘wwww‘)
return x + y

@app.task
def mul(x, y):
return x * y

@app.task
def xsum(numbers):
return sum(numbers)

@app.task
def test(arg):
print(arg)

def test11():
time.sleep(1)
print(‘test11‘)

....略

运行-发布任务
celery -A celery_worker beat

运行-执行任务
celery -A celery_worker -l info -P eventlet

3.2.2 装饰器添加任务-动态添加

在上一章节中给出的案例是在配置文件中写入参数beat_schedule;
有时这样不太方便,需要更灵活的添加定时任务;

def setup_periodic_tasks(sender, **kwargs):
    # 每5s调用 test(‘hello‘)
    sender.add_periodic_task(5.0, test.s(‘hello‘), name=‘add every 5‘)

    # 每20s调用 test(‘world‘)
    sender.add_periodic_task(10.0, test.s(‘world‘), expires=7)

    # 每周一早上7:30 执行 test(‘Happy Mondays!‘)
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1), # 可灵活修改
        test.s(‘Happy Mondays!‘),
    )


setup_periodic_tasks(app)
print(‘定时任务列表‘, app.conf.beat_schedule)

执行命令celery -A celery_test beat -l debug

可以比较一下定时任务列表的输出。
没有添加任务:


(vir_venv) E:pythoncode_2>celery -A celery_test beat -l debug
定时任务列表 {}
celery beat v4.3.0 (rhubarb) is starting.

添加任务之后:


(vir_venv) E:pythoncode_2>celery -A celery_test beat -l debug
定时任务列表 # 已格式化
{
"add every 5": {
"schedule": 5.0,
"task": "celery_tasks.test",
"args": [
"hello"
],
"kwargs": {},
"options": {}
},
"celery_tasks.test(‘world‘)": {
"schedule": 10.0,
"task": "celery_tasks.test",
"args": [
"world"
],
"kwargs": {},
"options": {
"expires": 7
}
},
"celery_tasks.test(‘Happy Mondays!‘)": {
"schedule": "<crontab: 30 7 1 * * (m/h/d/dM/MY)>",
"task": "celery_tasks.test",
"args": [
"HappyMondays!"
],
"kwargs": {},
"options": {}
}
}

相关启动命令:
celery -A celery_worker worker -l info -P eventlet --logfile=c.log
celery -A celery_test beat -l debug

4. celery相关命令

发布任务
celery -A celery_task beat

执行任务
celery -A celery_task worker -l info -P eventlet

后台启动celery worker进程
celery multi start work_1 -A appcelery

停止worker进程,如果无法停止,加上-A
celery multi stop WORKNAME

重启worker进程
celery multi restart WORKNAME

查看进程数
celery status -A celery_task

5. 指定时间格式

复杂的定时功能可以使用crontab功能,它跟linux自带的crontab所支持的格式是一样的,非常方便定制任务执行时间。

from celery.schedules import crontab
 
app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    ‘add-every-monday-morning‘: {
        ‘task‘: ‘tasks.add‘,
        ‘schedule‘: crontab(hour=7, minute=30, day_of_week=1),
        ‘args‘: (16, 16),
    },
}

上面的案例是在每个周一的7:30执行tasks.add任务。

以上是关于celery task - 2的主要内容,如果未能解决你的问题,请参考以下文章

python小随笔celery周期任务(简单原理)

celery application

Celery period_task 并行运行多次

celery task调用

Celery实现定时任务crontab

获取RecursionError / KeyError尝试在Celery beat中添加Periodic Task