Flask 学习-86.Flask-APScheduler 创建定时任务

Posted 上海-悠悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flask 学习-86.Flask-APScheduler 创建定时任务相关的知识,希望对你有一定的参考价值。

前言

Flask-APScheduler是根据APScheduler编写的一个flask模块,它提供了API管理任务。
Advanced Python Scheduler(APScheduler)是一个Python库,可让Python代码稍后执行,一次或定期执行。

环境准备

pip安装

pip install Flask-APScheduler

官网地址https://viniciuschiele.github.io/flask-apscheduler/index.html
文档参考https://leezhonglin.github.io/2019/05/09/Flask-APScheduler

Flask-APScheduler是基于APScheduler库开发的Flask拓展库。APScheduler的全称是Advanced Python Scheduler。
允许您将Python代码安排为稍后执行,可以只执行一次,也可以定期执行。您可以随时添加新作业或删除旧作业。
如果您将作业存储在数据库中,那么调度程序重启后它们也将存活下来并保持其状态。
当调度器重新启动时,它将运行它在离线时应该运行的所有作业,APScheduler文档https://link.zhihu.com/?target=https%3A//apscheduler.readthedocs.io/en/latest/index.html

基本概念

apscheduler 四个组件:

  • triggers: 任务触发器组件,提供任务触发方式
  • job stores: 任务商店组件,提供任务保存方式
  • executors: 任务调度组件,提供任务调度方式
  • schedulers: 任务调度组件,提供任务工作方式

triggers 3种触发方式

  • date:固定日期触发器,任务只运行一次
  • interval 时间间隔触发器
  • cron 定时任务触发

job stores 支持四种任务存储方式

  • memory:默认配置任务存在内存中
  • mongdb:支持文档数据库存储
  • sqlalchemy:支持关系数据库存储
  • redis:支持键值对数据库存储

schedulers
调度器主要分三种,一种独立运行的,一种是后台运行的,最后一种是配合其它程序使用

  • BlockingScheduler: 当这个调度器是你应用中 唯一要运行 的东西时使用
  • BackgroundScheduler: 当 不运行其它框架 的时候使用,并使你的任务在 后台运行
  • Asyncioscheduler: 当你的程序是 异步IO模型 的时候使用
  • GeventScheduler: 和 gevent 框架配套使用
  • TornadoScheduler: 和 tornado 框架配套使用
  • TwistedScheduler: 和 Twisted 框架配套使用
  • QtScheduler: 开发 qt 应用的时候使用

Flask-APScheduler 中默认使用的就是 BackgroundScheduler

triggers 触发器

triggers支持三种任务触发方式
date:固定日期触发器,任务只运行一次,运行完毕自动清除;若错过指定运行时间,任务不会被创建

使用示例

scheduler.add_job(start_system, 'date', run_date='2019-4-24 00:00:01', args=['text'])

interval 时间间隔触发器,每个一定时间间隔执行一次。

参数说明
weeks (int)间隔几周
days (int)间隔几天
hours (int)间隔几小时
minutes (int)间隔几分钟
seconds (int)间隔多少秒
start_date (datetime 或 str)开始日期
end_date (datetime 或 str)结束日期

使用示例

scheduler .add_job(alarm_job, 'interval', hours=2, start_date='2019-4-24 00:00:00' , end_date='2019-4-24 08:00:00')

cron 定时任务触发

参数说明
year (int 或 str)表示四位数的年份 (2019)
month(int\\ str)月 (范围1-12) 可以是int类型,也可以是str类型
day(int\\ str)日 (范围1-31)
week(int\\ str)周 (范围1-53)
day_of_week (int\\ str)表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示
hour (int\\ str)表示取值范围为0-23时
minute (int\\ str)表示取值范围为0-59分
second (int\\ str)表示取值范围为0-59秒
start_date (datetime\\ str)表示开始时间 可以是datetime类型,也可以是str类型
end_date (datetime\\ str)表示结束时间
timezone (datetime.tzinfo\\ str)表示时区取值

表示每 10 秒执行该程序一次,相当于interval 间隔调度中seconds = 10

sched.add_job(my_job, 'cron', second = '*/10')

快速开始

一个简单的示例

from flask import Flask
from flask_apscheduler import APScheduler
import time


class Config(object):
    SCHEDULER_TIMEZONE = 'Asia/Shanghai'  # 配置时区
    SCHEDULER_API_ENABLED = True  # 添加API

scheduler = APScheduler()


# interval example, 间隔执行, 每10秒执行一次
@scheduler.task('interval', id='task_1', seconds=10, misfire_grace_time=900)
def task1():
    print('task 1 executed --------', time.time())


# cron examples, 每5秒执行一次 相当于interval 间隔调度中seconds = 5
@scheduler.task('cron', id='task_2', second='*/20')
def task2():
    print('task 2 executed --------', time.time())


if __name__ == '__main__':
    app = Flask(__name__)
    app.config.from_object(Config())
    scheduler.init_app(app)
    scheduler.start()
    app.run()

执行结果

task 1 executed -------- 1665392432.7244706
task 1 executed -------- 1665392433.6400995
task 2 executed -------- 1665392440.0005376
task 2 executed -------- 1665392440.0157993

执行结果会发现有bug,任务会执行2次,

可以在app.run()加上参数use_reloader=False

    app.run(use_reloader=False)

如果上面方法不能解决,在网上看到可以用单例模式解决https://blog.csdn.net/liyacai_20120512/article/details/121190800

使用add_job() 添加任务

除了上面的装饰器方法,还可以用add_job() 添加任务

from flask import Flask
from flask_apscheduler import APScheduler
import time


class Config(object):
    SCHEDULER_TIMEZONE = 'Asia/Shanghai'  # 配置时区
    SCHEDULER_API_ENABLED = True  # 添加API

scheduler = APScheduler()


def task1(x):
    print(f'task 1 executed --------: x', time.time())


def task2(x):
    print(f'task 2 executed --------: x', time.time())


if __name__ == '__main__':
    app = Flask(__name__)
    app.config.from_object(Config())
    scheduler.init_app(app)
    # add_job() 添加任务
    scheduler.add_job(func=task1, args=('循环',), trigger='interval', seconds=5, id='interval_task')
    scheduler.add_job(func=task2, args=('定时任务',), trigger='cron', second='*/10', id='cron_task')
    scheduler.start()
    app.run(use_reloader=False)

运行结果

task 1 executed --------: 循环 1665393635.2983236
task 2 executed --------: 定时任务 1665393640.0021677
task 1 executed --------: 循环 1665393640.2968209
task 1 executed --------: 循环 1665393645.2960336

使用上下文操作数据库

如果正在使用 Flask-SQLAlchemy 并在定时任务中执行数据库操作,需要提供 Flask 应用程序上下文:

from flask_apscheduler import APScheduler
scheduler = APScheduler()
 
@scheduler.task(
     "interval",
      id="update_news_graph_job",
     minutes = 16
 )
def update_news_graph():
 
        # 提供flask上下文对象
        with scheduler.app.app_context():
            result = db.session.execute(query_sql)

前提是 scheduler 需要在flask中注册:

    scheduler.init_app(app)
    scheduler.start()

日志设置

如果定时任务执行间隔几秒钟, 调度程序的日志会很多,可以设置调度程序日志级别或完全禁用:

#设置调度程序的日志级别, 原本级别为info
 
scheduler.start()
scheduler.add_job(every_minute, trigger='cron', second=0, id='every_minute')
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
 
 
#或者禁用调度程序日志
logging.getLogger('apscheduler.executors.default').propagate = False

以上是关于Flask 学习-86.Flask-APScheduler 创建定时任务的主要内容,如果未能解决你的问题,请参考以下文章

Flask学习总结

python--flask学习1

Flask学习-Flask app启动过程

flask框架的学习

Flask 学习-42.Flask-RESTX 快速入门

Python Flask教程学习03