celery的使用
Posted 小小菜_v
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了celery的使用相关的知识,希望对你有一定的参考价值。
celery+mysql+flask的定时任务使用流程
1. 安装Celery
pip install celery
pip install celery-sqlalchemy-scheduler
2. 安装redis
3. FLASK的config.py添加配置信息
# celery的redis配置
broker_url = 'redis://127.0.0.1:6379/0'
result_backend = 'redis://127.0.0.1:6379/1'
COLLECT_DB_URL = os.environ.get("COLLECT_DB_URL", "mysql+mysqlconnector://user:password@**.*.*.*:31698/mysql")
COLLECT_DATABASE_URI = COLLECT_DB_URL.format(user=MYSQL_USER, password=MYSQL_PASSWORD)
task.py中写定时任务和celery初始化
sys.path.append("..")
import platform
from flask import Flask, current_app
from celery import Celery
from celery.schedules import crontab
from titsm.config import Config
from book.app.ext import db, swagger, cors, cache
from book.app.models import CronRecord
from celery import schedules
from celery_sqlalchemy_scheduler.schedulers import DatabaseScheduler
app = Flask('book', template_folder=os.path.join(Config.PROJECT_BASE_PATH, 'app/templates'))
app.config.from_object(Config)
celery_app = Celery(app.name, broker=Config.broker_url, backend=Config.result_backend)
# celery_app.conf.beat_schedule = Config.beat_schedule
# celery_app.conf.update(app.config)
# 定采任务配置
beat_dburi = Config.COLLECT_DATABASE_URI
if platform.system() == 'Windows':
# Celery在Windows环境下运行需要设置这个变量,否则调用任务会报错
os.environ['FORKED_BY_MULTIPROCESSING'] = '1'
beat_scheduler = 'celery_sqlalchemy_scheduler.schedulers:DatabaseScheduler'
beat_sync_every = 0
beat_max_loop_interval = 10
timezone = 'Asia/Shanghai'
# 默认每个worker跑完10个任务后,自我销毁程序重建来释放内存
# 防止内存泄漏
worker_max_tasks_per_child = 10
'''
crontab() 每分钟
crontab(minute=0, hour=0) 每天的0时0分
crontab(minute=0, hour='*/3') 每三小时
crontab(day_of_week='sunday') 周日的每一小时
crontab(minute='*',hour='*', day_of_week='sun') 与上面相同
crontab(minute=0, hour='*/3,8-17') 每三个小时 8时到17时的每小时
'''
@celery_app.task
def add(x, y):
# some long running task here
print("success")
print("nice"+str(x+y))
return x+y
# 通过beat_scheduler统一管理,
# 如果数据库修改了下面的schedule,beat重启后数据库会被下面的配置覆盖
beat_schedule =
"task_add":
"task": "tasks.add",
"schedule": crontab(minute=os.environ.get("MINUTE_SEND_MAIL_FOR_UNSUBMITTED_EXCEPTION") or "1",
hour=os.environ.get("HOUR_SEND_MAIL_FOR_UNSUBMITTED_EXCEPTION") or "9"),
"args": ()
config = dict(
beat_schedule=beat_schedule,
beat_scheduler=beat_scheduler,
beat_max_loop_interval=beat_max_loop_interval,
beat_dburi=beat_dburi,
timezone=timezone,
worker_max_tasks_per_child=worker_max_tasks_per_child
)
celery_app.conf.update(config)
4. 若将任务存到DB(mysql),则代码中可以将任务添加到PeriodicTask表中并在CrontabSchedule表中设置时间
# 将任务存到DB参考: https://blog.csdn.net/yannanxiu/article/details/85543684
from celery_sqlalchemy_scheduler import PeriodicTask, CrontabSchedule, PeriodicTaskChanged
def add_auto_book(cls, book_id,old_book):
"""
根据定采任务的内容配置celery定时任务
:param book_id: 定采对象
:return:
"""
book_info = BookInfo.query.filter_by(
book_id=book_id).first()
job_name = f'BookJob_book_info .book_id'
tasks = db.session.query(PeriodicTask).filter_by(name=job_name).first()
if tasks:
current_app.logger.info("task:0 is already setup, return.".format(job_name))
return tasks.id
batch = json.loads(book_info .batch)
expired_date = book_info .end_time
start_time = book_info .start_time
hour = book_info .hour if book_info .hour else '0'
minute = book_info .min if book_info .min else '00'
if not batch:
abort(400, "缺少采集参数")
day_of_month = ','.join(batch.get('day'))
day_of_week = ','.join(batch.get('week'))
month_of_year = ','.join(batch.get('month'))
current_app.logger.info(f'auto_book params:\\nminute=minute,hour=hour,day_of_week=day_of_week,'
f'day_of_month=day_of_month,month_of_year=month_of_year')
info = Book.query.filter_by(book_id=book_info .book_id).first()
collect_kwargs = 'book_id': book_info .book_id,
'type': info.book_type,
'job_name': job_name,
'reporter_um': info.reporter_um,
'subject': info.subject
try:
if old_book:
task_info = db.session.query(PeriodicTask).filter_by(name=f'CollectJob_old_book').first()
task_info.kwargs = json.dumps(collect_kwargs)
task_info.name = job_name
task_info.date_changed = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
db.session.add(task_info)
db.session.commit()
else:
new_cs = CrontabSchedule(minute=minute,
hour=hour,
day_of_week=day_of_week,
day_of_month=day_of_month,
month_of_year=month_of_year,
timezone="Asia/Shanghai")
db.session.add(new_cs)
db.session.flush()
crontab_id = new_cs.id
db.session.commit()
pt = PeriodicTask(name=job_name,
task='tasks.regular_collect',
enabled=1,
crontab_id=crontab_id,
kwargs=json.dumps(collect_kwargs),
expires=expired_date,
start_time=start_time)
db.session.add(pt)
db.session.commit()
db.session.query(PeriodicTaskChanged).filter(PeriodicTaskChanged.id == 1). \\
update(PeriodicTaskChanged.last_update: datetime.now(), synchronize_session=False)
except ValueError as e:
current_app.logger.error(f'配置定采任务策略失败:error_info=str(e)')
abort(400, f'配置定采任务策略失败:error_info=str(e)')
current_app.logger.info("task:0 setup finish.".format(job_name))
5.启动 worker和beat , beat 用于定时任务
python manage.py celery worker --loglevel=info –beat
6.监控worker运行情况
python manage.py celerycam --frequency=10.0
7.部署
安装
pip install supervisor
- 生成配置
echo_supervisord_conf >/supervisor_flask.conf
- 添加进程和进程组配置supervisor_flask.conf
; Sample supervisor config file.
;
; For more information on the config file, please see:
; http://supervisord.org/configuration.html
;
; Notes:
; - Shell expansion ("~" or "$HOME") is not supported. Environment
; variables can be expanded using this syntax: "%(ENV_HOME)s".
; - Quotes around values are not supported, except in the case of
; the environment= options as shown below.
; - Comments must have a leading space: "a=b ;comment" not "a=b;comment".
; - Command will be truncated if it looks like a config file comment, e.g.
; "command=bash -c 'foo ; bar'" will truncate to "command=bash -c 'foo ".
[unix_http_server]
file=/var/run/supervisor.sock ; the path to the socket file
;chmod=0700 ; socket file mode (default 0700)
;chown=nobody:nogroup ; socket file uid:gid owner
;username=user ; default is no username (open server)
;password=123 ; default is no password (open server)
;[inet_http_server] ; inet (TCP) server disabled by default
;port=127.0.0.1:9001 ; ip_address:port specifier, *:port for all iface
;username=user ; default is no username (open server)
;password=123 ; default is no password (open server)
[supervisord]
logfile=/var/log/supervisord.log ; main log file; default $CWD/supervisord.log
logfile_maxbytes=50MB ; max main logfile bytes b4 rotation; default 50MB
logfile_backups=10 ; # of main logfile backups; 0 means none, default 10
loglevel=info ; log level; default info; others: debug,warn,trace
pidfile=/var/run/supervisord.pid ; supervisord pidfile; default supervisord.pid
nodaemon=true ; start in foreground if true; default false
minfds=1024 ; min. avail startup file descriptors; default 1024
minprocs=200 ; min. avail process descriptors;default 200
;umask=022 ; process file creation umask; default 022
;user=chrism ; default is current user, required if root
;identifier=supervisor ; supervisord identifier, default is 'supervisor'
;directory=/tmp ; default is not to cd during start
;nocleanup=true ; don't clean up tempfiles at start; default false
;childlogdir=/tmp ; 'AUTO' child log dir, default $TEMP
;environment=KEY="value" ; key value pairs to add to environment
;strip_ansi=false ; strip ansi escape codes in logs; def. false
; The rpcinterface:supervisor section must remain in the config file for
; RPC (supervisorctl/web interface) to work. Additional interfaces may be
; added by defining them in separate [rpcinterface:x] sections.
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
; The supervisorctl section configures how supervisorctl will connect to
; supervisord. configure it match the settings in either the unix_http_server
; or inet_http_server section.
[supervisorctl]
serverurl=unix:///var/run/supervisor.sock ; use a unix:// URL for a unix socket
;serverurl=http://127.0.0.1:9001 ; use an http:// url to specify an inet socket
;username=chris ; should be same as in [*_http_server] if set
;password=123 ; should be same as in [*_http_server] if set
;prompt=mysupervisor ; cmd line prompt (default "supervisor")
;history_file=~/.sc_history ; use readline history if available
; The sample program section below shows all possible program subsection values.
; Create one or more 'real' program: sections to be able to control them under
; supervisor.
;[program:theprogramname]
;command=/bin/cat ; the program (relative uses PATH, can take args)
;process_name=%(program_name)s ; process_name expr (default %(program_name)s)
;numprocs=1 ; number of processes copies to start (def 1)
;directory=/tmp ; directory to cwd to before exec (def no cwd)
;umask=022 ; umask for process (default None)
;priority=999 ; the relative start priority (default 999)
;autostart=true ; start at supervisord start (default: true)
;startsecs=1 ; # of secs prog must stay up to be running (def. 1)
;startretries=3 ; max # of serial start failures when starting (default 3)
;autorestart=unexpected ; when to restart if exited after running (def: unexpected)
;exitcodes=0,2 ; 'expected' exit codes used with autorestart (default 0,2)
;stopsignal=QUIT ; signal used to kill process (default TERM)
;stopwaitsecs=10 ; max num secs to wait b4 SIGKILL (default 10)
;stopasgroup=false ; send stop signal to the UNIX process group (default false)
;killasgroup=false ; SIGKILL the UNIX process group (def false)
;user=chrism ; setuid to this UNIX account to run the program
;redirect_stderr=true ; redirect proc stderr to stdout (default false)
;stdout_logfile=/a/path ; stdout log path, NONE for none; default AUTO
;stdout_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB)
;stdout_logfile_backups=10 ; # of stdout logfile backups (0 means none, default 10)
;stdout_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0)
;stdout_events_enabled=false ; emit events on stdout writes (default false)
;stderr_logfile=/a/path ; stderr log path, NONE for none; default AUTO
;stderr_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB)
;stderr_logfile_backups=10 ; # of stderr logfile backups (0 means none, default 10)
;stderr_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0)
;stderr_events_enabled=false ; emit events on stderr writes (default false)
;environment=A="1",B="2" ; process environment additions (def no adds)
;serverurl=AUTO ; override serverurl computation (childutils)
; The sample eventlistener section below shows all possible eventlistener
; subsection values. Create one or more 'real' eventlistener: sections to be
; able to handle event notifications sent by supervisord.
;[eventlistener:theeventlistenername]
;command=/bin/eventlistener ; the program (relative uses PATH, can take args)
;process_name=%(program_name)s ; process_name expr (default %(program_name)s)
;numprocs=1 ; number of processes copies to start (def 1)
;events=EVENT ; event notif. types to subscribe to (req'd)
;buffer_size=10 ; event buffer queue size (default 10)
;directory=/tmp ; directory to cwd to before exec (def no cwd)
;umask=022 ; umask for process (default None)
;priority=-1 ; the relative start priority (default -1)
;autostart=true ; start at supervisord start (default: true)
;startsecs=1 ; # of secs prog must stay up to be running (def. 1)
;startretries=3 ; max # of serial start failures when starting (default 3)
;autorestart=unexpected ; autorestart if exited after running (def: unexpected)
;exitcodes=0,2 ; 'expected' exit codes used with autorestart (default 0,2)
;stopsignal=QUIT ; signal used to kill process (default TERM)
;stopwaitsecs=10 ; max num secs to wait b4 SIGKILL (default 10)
;stopasgroup=false ; send stop signal to the UNIX process group (default false)
;killasgroup=false ; SIGKILL the UNIX process group (def false)
;user=chrism ; setuid to this UNIX account to run the program
;redirect_stderr=false ; redirect_stderr=true is not allowed for eventlisteners
;stdout_logfile=/a/path ; stdout log path, NONE for none; default AUTO
;stdout_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB)
;stdout_logfile_backups=10 ; # of stdout logfile backups (0 means none, default 10)
;stdout_events_enabled=false ; emit events on stdout writes (default false)
;stderr_logfile=/a/path ; stderr log path, NONE for none; default AUTO
;stderr_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB)
;s以上是关于celery的使用的主要内容,如果未能解决你的问题,请参考以下文章