celery的使用

Posted 小小菜_v

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了celery的使用相关的知识,希望对你有一定的参考价值。

celery+mysql+flask的定时任务使用流程

1. 安装Celery

pip install celery
pip install celery-sqlalchemy-scheduler

2. 安装redis

3. FLASK的config.py添加配置信息

# celery的redis配置
broker_url = 'redis://127.0.0.1:6379/0'
result_backend = 'redis://127.0.0.1:6379/1'

COLLECT_DB_URL = os.environ.get("COLLECT_DB_URL", "mysql+mysqlconnector://user:password@**.*.*.*:31698/mysql")
    COLLECT_DATABASE_URI = COLLECT_DB_URL.format(user=MYSQL_USER, password=MYSQL_PASSWORD)

task.py中写定时任务和celery初始化

sys.path.append("..")
import platform
from flask import Flask, current_app
from celery import Celery
from celery.schedules import crontab
from titsm.config import Config
from book.app.ext import db, swagger, cors, cache
from book.app.models import CronRecord
from celery import schedules
from celery_sqlalchemy_scheduler.schedulers import DatabaseScheduler


app = Flask('book', template_folder=os.path.join(Config.PROJECT_BASE_PATH, 'app/templates'))
app.config.from_object(Config)

celery_app = Celery(app.name, broker=Config.broker_url, backend=Config.result_backend)
# celery_app.conf.beat_schedule = Config.beat_schedule
# celery_app.conf.update(app.config)
# 定采任务配置
beat_dburi = Config.COLLECT_DATABASE_URI

if platform.system() == 'Windows':
    # Celery在Windows环境下运行需要设置这个变量,否则调用任务会报错
    os.environ['FORKED_BY_MULTIPROCESSING'] = '1'

beat_scheduler = 'celery_sqlalchemy_scheduler.schedulers:DatabaseScheduler'
beat_sync_every = 0
beat_max_loop_interval = 10
timezone = 'Asia/Shanghai'
# 默认每个worker跑完10个任务后,自我销毁程序重建来释放内存
# 防止内存泄漏
worker_max_tasks_per_child = 10



'''
crontab()  每分钟

crontab(minute=0, hour=0)  每天的0时0分

crontab(minute=0, hour='*/3')  每三小时

crontab(day_of_week='sunday')  周日的每一小时

crontab(minute='*',hour='*', day_of_week='sun') 与上面相同

crontab(minute=0, hour='*/3,8-17') 每三个小时  8时到17时的每小时
'''

@celery_app.task
def add(x, y):
    # some long running task here
    print("success")
    print("nice"+str(x+y))
    return x+y

# 通过beat_scheduler统一管理,
# 如果数据库修改了下面的schedule,beat重启后数据库会被下面的配置覆盖
beat_schedule = 
        "task_add": 
            "task": "tasks.add",
            "schedule": crontab(minute=os.environ.get("MINUTE_SEND_MAIL_FOR_UNSUBMITTED_EXCEPTION") or "1",
                                hour=os.environ.get("HOUR_SEND_MAIL_FOR_UNSUBMITTED_EXCEPTION") or "9"),
            "args": ()
        


config = dict(
    beat_schedule=beat_schedule,
    beat_scheduler=beat_scheduler,
    beat_max_loop_interval=beat_max_loop_interval,
    beat_dburi=beat_dburi,
    timezone=timezone,
    worker_max_tasks_per_child=worker_max_tasks_per_child
)

celery_app.conf.update(config)

4. 若将任务存到DB(mysql),则代码中可以将任务添加到PeriodicTask表中并在CrontabSchedule表中设置时间

# 将任务存到DB参考: https://blog.csdn.net/yannanxiu/article/details/85543684
from celery_sqlalchemy_scheduler import PeriodicTask, CrontabSchedule, PeriodicTaskChanged

def add_auto_book(cls, book_id,old_book):
    """
    根据定采任务的内容配置celery定时任务
    :param book_id: 定采对象
    :return:
    """ 
    book_info = BookInfo.query.filter_by(
        book_id=book_id).first()
    job_name = f'BookJob_book_info .book_id'
    tasks = db.session.query(PeriodicTask).filter_by(name=job_name).first()
    if tasks:
        current_app.logger.info("task:0 is already setup, return.".format(job_name))
        return tasks.id
    batch = json.loads(book_info .batch)
    expired_date = book_info .end_time
    start_time = book_info .start_time
    hour = book_info .hour if book_info .hour else '0'
    minute = book_info .min if book_info .min else '00'
    if not batch:
        abort(400, "缺少采集参数")
    day_of_month = ','.join(batch.get('day'))
    day_of_week = ','.join(batch.get('week'))
    month_of_year = ','.join(batch.get('month'))
    current_app.logger.info(f'auto_book params:\\nminute=minute,hour=hour,day_of_week=day_of_week,'
                            f'day_of_month=day_of_month,month_of_year=month_of_year')
    info = Book.query.filter_by(book_id=book_info .book_id).first()
    collect_kwargs = 'book_id': book_info .book_id,
                      'type': info.book_type,
                      'job_name': job_name,
                      'reporter_um': info.reporter_um,
                      'subject': info.subject
                      

    try:
        if old_book:
            task_info = db.session.query(PeriodicTask).filter_by(name=f'CollectJob_old_book').first()
            task_info.kwargs = json.dumps(collect_kwargs)
            task_info.name = job_name
            task_info.date_changed = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            db.session.add(task_info)
            db.session.commit()
        else:
            new_cs = CrontabSchedule(minute=minute,
                                     hour=hour,
                                     day_of_week=day_of_week,
                                     day_of_month=day_of_month,
                                     month_of_year=month_of_year,
                                     timezone="Asia/Shanghai")
            db.session.add(new_cs)
            db.session.flush()
            crontab_id = new_cs.id
            db.session.commit()
            pt = PeriodicTask(name=job_name,
                              task='tasks.regular_collect',
                              enabled=1,
                              crontab_id=crontab_id,
                              kwargs=json.dumps(collect_kwargs),
                              expires=expired_date,
                              start_time=start_time)
            db.session.add(pt)
            db.session.commit()
        db.session.query(PeriodicTaskChanged).filter(PeriodicTaskChanged.id == 1). \\
            update(PeriodicTaskChanged.last_update: datetime.now(), synchronize_session=False)
    except ValueError as e:
        current_app.logger.error(f'配置定采任务策略失败:error_info=str(e)')
        abort(400, f'配置定采任务策略失败:error_info=str(e)')
        current_app.logger.info("task:0 setup finish.".format(job_name))

5.启动 worker和beat , beat 用于定时任务

python manage.py celery worker --loglevel=info –beat

6.监控worker运行情况

python manage.py celerycam --frequency=10.0

7.部署

安装

pip install supervisor
  1. 生成配置
echo_supervisord_conf >/supervisor_flask.conf
  1. 添加进程和进程组配置supervisor_flask.conf
; Sample supervisor config file.
;
; For more information on the config file, please see:
; http://supervisord.org/configuration.html
;
; Notes:
;  - Shell expansion ("~" or "$HOME") is not supported.  Environment
;    variables can be expanded using this syntax: "%(ENV_HOME)s".
;  - Quotes around values are not supported, except in the case of
;    the environment= options as shown below.
;  - Comments must have a leading space: "a=b ;comment" not "a=b;comment".
;  - Command will be truncated if it looks like a config file comment, e.g.
;    "command=bash -c 'foo ; bar'" will truncate to "command=bash -c 'foo ".

[unix_http_server]
file=/var/run/supervisor.sock   ; the path to the socket file
;chmod=0700                 ; socket file mode (default 0700)
;chown=nobody:nogroup       ; socket file uid:gid owner
;username=user              ; default is no username (open server)
;password=123               ; default is no password (open server)

;[inet_http_server]         ; inet (TCP) server disabled by default
;port=127.0.0.1:9001        ; ip_address:port specifier, *:port for all iface
;username=user              ; default is no username (open server)
;password=123               ; default is no password (open server)

[supervisord]
logfile=/var/log/supervisord.log ; main log file; default $CWD/supervisord.log
logfile_maxbytes=50MB        ; max main logfile bytes b4 rotation; default 50MB
logfile_backups=10           ; # of main logfile backups; 0 means none, default 10
loglevel=info                ; log level; default info; others: debug,warn,trace
pidfile=/var/run/supervisord.pid ; supervisord pidfile; default supervisord.pid
nodaemon=true               ; start in foreground if true; default false
minfds=1024                  ; min. avail startup file descriptors; default 1024
minprocs=200                 ; min. avail process descriptors;default 200
;umask=022                   ; process file creation umask; default 022
;user=chrism                 ; default is current user, required if root
;identifier=supervisor       ; supervisord identifier, default is 'supervisor'
;directory=/tmp              ; default is not to cd during start
;nocleanup=true              ; don't clean up tempfiles at start; default false
;childlogdir=/tmp            ; 'AUTO' child log dir, default $TEMP
;environment=KEY="value"     ; key value pairs to add to environment
;strip_ansi=false            ; strip ansi escape codes in logs; def. false

; The rpcinterface:supervisor section must remain in the config file for
; RPC (supervisorctl/web interface) to work.  Additional interfaces may be
; added by defining them in separate [rpcinterface:x] sections.

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

; The supervisorctl section configures how supervisorctl will connect to
; supervisord.  configure it match the settings in either the unix_http_server
; or inet_http_server section.

[supervisorctl]
serverurl=unix:///var/run/supervisor.sock ; use a unix:// URL  for a unix socket
;serverurl=http://127.0.0.1:9001 ; use an http:// url to specify an inet socket
;username=chris              ; should be same as in [*_http_server] if set
;password=123                ; should be same as in [*_http_server] if set
;prompt=mysupervisor         ; cmd line prompt (default "supervisor")
;history_file=~/.sc_history  ; use readline history if available

; The sample program section below shows all possible program subsection values.
; Create one or more 'real' program: sections to be able to control them under
; supervisor.

;[program:theprogramname]
;command=/bin/cat              ; the program (relative uses PATH, can take args)
;process_name=%(program_name)s ; process_name expr (default %(program_name)s)
;numprocs=1                    ; number of processes copies to start (def 1)
;directory=/tmp                ; directory to cwd to before exec (def no cwd)
;umask=022                     ; umask for process (default None)
;priority=999                  ; the relative start priority (default 999)
;autostart=true                ; start at supervisord start (default: true)
;startsecs=1                   ; # of secs prog must stay up to be running (def. 1)
;startretries=3                ; max # of serial start failures when starting (default 3)
;autorestart=unexpected        ; when to restart if exited after running (def: unexpected)
;exitcodes=0,2                 ; 'expected' exit codes used with autorestart (default 0,2)
;stopsignal=QUIT               ; signal used to kill process (default TERM)
;stopwaitsecs=10               ; max num secs to wait b4 SIGKILL (default 10)
;stopasgroup=false             ; send stop signal to the UNIX process group (default false)
;killasgroup=false             ; SIGKILL the UNIX process group (def false)
;user=chrism                   ; setuid to this UNIX account to run the program
;redirect_stderr=true          ; redirect proc stderr to stdout (default false)
;stdout_logfile=/a/path        ; stdout log path, NONE for none; default AUTO
;stdout_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
;stdout_logfile_backups=10     ; # of stdout logfile backups (0 means none, default 10)
;stdout_capture_maxbytes=1MB   ; number of bytes in 'capturemode' (default 0)
;stdout_events_enabled=false   ; emit events on stdout writes (default false)
;stderr_logfile=/a/path        ; stderr log path, NONE for none; default AUTO
;stderr_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
;stderr_logfile_backups=10     ; # of stderr logfile backups (0 means none, default 10)
;stderr_capture_maxbytes=1MB   ; number of bytes in 'capturemode' (default 0)
;stderr_events_enabled=false   ; emit events on stderr writes (default false)
;environment=A="1",B="2"       ; process environment additions (def no adds)
;serverurl=AUTO                ; override serverurl computation (childutils)

; The sample eventlistener section below shows all possible eventlistener
; subsection values.  Create one or more 'real' eventlistener: sections to be
; able to handle event notifications sent by supervisord.

;[eventlistener:theeventlistenername]
;command=/bin/eventlistener    ; the program (relative uses PATH, can take args)
;process_name=%(program_name)s ; process_name expr (default %(program_name)s)
;numprocs=1                    ; number of processes copies to start (def 1)
;events=EVENT                  ; event notif. types to subscribe to (req'd)
;buffer_size=10                ; event buffer queue size (default 10)
;directory=/tmp                ; directory to cwd to before exec (def no cwd)
;umask=022                     ; umask for process (default None)
;priority=-1                   ; the relative start priority (default -1)
;autostart=true                ; start at supervisord start (default: true)
;startsecs=1                   ; # of secs prog must stay up to be running (def. 1)
;startretries=3                ; max # of serial start failures when starting (default 3)
;autorestart=unexpected        ; autorestart if exited after running (def: unexpected)
;exitcodes=0,2                 ; 'expected' exit codes used with autorestart (default 0,2)
;stopsignal=QUIT               ; signal used to kill process (default TERM)
;stopwaitsecs=10               ; max num secs to wait b4 SIGKILL (default 10)
;stopasgroup=false             ; send stop signal to the UNIX process group (default false)
;killasgroup=false             ; SIGKILL the UNIX process group (def false)
;user=chrism                   ; setuid to this UNIX account to run the program
;redirect_stderr=false         ; redirect_stderr=true is not allowed for eventlisteners
;stdout_logfile=/a/path        ; stdout log path, NONE for none; default AUTO
;stdout_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
;stdout_logfile_backups=10     ; # of stdout logfile backups (0 means none, default 10)
;stdout_events_enabled=false   ; emit events on stdout writes (default false)
;stderr_logfile=/a/path        ; stderr log path, NONE for none; default AUTO
;stderr_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
;s

以上是关于celery的使用的主要内容,如果未能解决你的问题,请参考以下文章

django-celery-beat 垃圾邮件到期任务

Celery框架

通知 celery 任务在工作人员关闭期间停止

celery 运行时 Nginx 无响应

使用 Mandrill send_at 或 Celery countdown/eta 延迟发送电子邮件

Django celery 定期任务间隔更改未在数据库中更新