Flask-Apscheduler
Posted Flask学习笔记
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flask-Apscheduler相关的知识,希望对你有一定的参考价值。
Flask-Apscheduler
安装
$ pip install flask-apscheduler
# ~/anaconda3/envs/learnpy/bin/pip install flask_apscheduler
# flask-apscheduler-1.11.0项目:https://github.com/viniciuschiele/flask-apscheduler
根据官方的声明,它支持
根据 Flask
配置加载调度器配置根据 Flask
配置加载任务调度器允许指定服务器运行任务 提供 RESTful API
管理任务,也就是远程管理任务为 RESTful API
提供认证
示例
官方提供了一些使用的示例,我们从中可以看到的它的一些用法.
from flask import Flask
from flask_apscheduler import APScheduler
class Config(object):
JOBS = [
{
'id': 'job1',
'func': 'jobs:job1',
'args': (1, 2),
'trigger': 'interval',
'seconds': 10
}
]
SCHEDULER_API_ENABLED = True # 开启 RESTFUL API
def job1(a, b):
print(str(a) + ' ' + str(b))
if __name__ == '__main__':
app = Flask(__name__)
app.config.from_object(Config())
scheduler = APScheduler()
# it is also possible to enable the API directly
# scheduler.api_enabled = True
scheduler.init_app(app)
scheduler.start()
app.run()基于此,我们可以来化用一下
❯ tree -L 2
.
├── app.py
├── jobs
│ ├── __init__.py
│ └── jobs.py
├── static
└── templates# jobs.py
from datetime import datetime
def my_job(name):
print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), '%s' %name)# app.py
from flask import Flask
from flask_apscheduler import APScheduler
from jobs.jobs import my_job
app = Flask(__name__)
app.config.update({
"SCHEDULER_API_ENABLED" : True, # 显示RESTFul API
"JOBS":[
{
"id" : 'job1', # 任务id
'func' : 'jobs.jobs:my_job', # 指定任务位置,模块名:方法
'trigger':'interval', # 指定触发器
'seconds' : 5, # 指定时间间隔
'args':['ning'] # 指定func中的参数
}
]
})
apscheduler = APScheduler()
apscheduler.init_app(app)
@app.route('/')
def hello_world():
apscheduler.start()
return 'Hello World!'
if __name__ == '__main__':
app.run()当访问
http://127.0.0.1:5000
时,会自动激发任务apscheduler
.此任务就是模块jobs.jobs
的my_job
方法.此外,还设置了
SCHEDULER_API_ENABLED
为True
,那么默认可以访问API
.如果想要友好显示,可以安装
chrome
插件json-handle
来显示.通过查看源码,可以看到默认的
API
def _load_api(self):
"""
Add the routes for the scheduler API.
"""
self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET')
self._add_url_route('add_job', '/jobs', api.add_job, 'POST')
self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET')
self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET')
self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE')
self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH')
self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST')
self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST')
self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')
除去在配置文件中配置任务,还可以使用装饰器来添加任务.
from flask import Flask
from flask_apscheduler import APScheduler
class Config(object):
SCHEDULER_API_ENABLED = True
scheduler = APScheduler()
# interval examples
@scheduler.task('interval', id='do_job_1', seconds=30, misfire_grace_time=900)
def job1():
print('Job 1 executed')
# cron examples
@scheduler.task('cron', id='do_job_2', minute='*')
def job2():
print('Job 2 executed')
@scheduler.task('cron', id='do_job_3', week='*', day_of_week='sun')
def job3():
print('Job 3 executed')
if __name__ == '__main__':
app = Flask(__name__)
app.config.from_object(Config())
# it is also possible to enable the API directly
# scheduler.api_enabled = True
scheduler.init_app(app)
scheduler.start()
app.run()
config
配置
SCHEDULER_AUTH
配置验证信息 SCHEDULER_API_ENABLED=True
Restful API
是否开启SCHEDULER_API_PREFIX
Blueprint
前缀,默认是scheduler
SCHEDULER_ENDPOINT_PREFIX
endpoint
路由后缀SCHEDULER_ALLOWED_HOSTS
运行访问的 domain_name
SCHEDULER_JOBSTORES
配置存储任务管理器 SCHEDULER_EXECUTORS
配置执行器 SCHEDULER_JOB_DEFAULTS
配置调度默认属性 SCHEDULER_TIMEZONE
配置时区
1.配置验证
from flask import Flask
from flask_apscheduler import APScheduler
from flask_apscheduler.auth import HTTPBasicAuth
class Config(object):
JOBS = [
{
'id': 'job1',
'func': '__main__:job1',
'args': (1, 2),
'trigger': 'interval',
'seconds': 10
}
]
SCHEDULER_API_ENABLED = True
SCHEDULER_AUTH = HTTPBasicAuth()
def job1(a, b):
print(str(a) + ' ' + str(b))
if __name__ == '__main__':
app = Flask(__name__)
app.config.from_object(Config())
scheduler = APScheduler()
# it is also possible to set the authentication directly
# scheduler.auth = HTTPBasicAuth()
scheduler.init_app(app)
scheduler.start()
@scheduler.authenticate
def authenticate(auth):
return auth['username'] == 'guest' and auth['password'] == 'guest'
app.run()
2.白名单
from flask import Flask
from flask_apscheduler import APScheduler
class Config(object):
JOBS = [
{
'id': 'job1',
'func': 'allowed_host:job1',
'args': (1, 2),
'trigger': 'interval',
'seconds': 10
}
]
SCHEDULER_ALLOWED_HOSTS = ['my_servers_name']
SCHEDULER_API_ENABLED = True
def job1(a, b):
print(str(a) + ' ' + str(b))
if __name__ == '__main__':
app = Flask(__name__)
app.config.from_object(Config())
scheduler = APScheduler()
# it is also possible to set the list of servers directly
# scheduler.allowed_hosts = ['my_servers_name']
scheduler.init_app(app)
scheduler.start()
app.run()
3.高级使用
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from flask import Flask
from flask_apscheduler import APScheduler
class Config(object):
JOBS = [
{
'id': 'job1',
'func': 'advanced:job1',
'args': (1, 2),
'trigger': 'interval',
'seconds': 10
}
]
SCHEDULER_JOBSTORES = {
'default': SQLAlchemyJobStore(url='sqlite://')
}
SCHEDULER_EXECUTORS = {
'default': {'type': 'threadpool', 'max_workers': 20}
}
SCHEDULER_JOB_DEFAULTS = {
'coalesce': False,
'max_instances': 3
}
SCHEDULER_API_ENABLED = True
def job1(a, b):
print(str(a) + ' ' + str(b))
if __name__ == '__main__':
app = Flask(__name__)
app.config.from_object(Config())
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
app.run()如果要在任务中查询数据库,则需注册到
app
中from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from flask import Flask
from flask_apscheduler import APScheduler
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True)
email = db.Column(db.String(120), unique=True)
def show_users():
with db.app.app_context():
print(User.query.all())
class Config(object):
JOBS = [
{
'id': 'job1',
'func': show_users,
'trigger': 'interval',
'seconds': 2
}
]
SCHEDULER_JOBSTORES = {
'default': SQLAlchemyJobStore(url='sqlite:///flask_context.db')
}
SCHEDULER_API_ENABLED = True
if __name__ == '__main__':
app = Flask(__name__)
app.config.from_object(Config())
db.app = app
db.init_app(app)
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
app.run()
设置时区
SCHEDULER_TIMEZONE = 'Asia/Shanghai'
以上是关于Flask-Apscheduler的主要内容,如果未能解决你的问题,请参考以下文章