Flask-APScheduler使用教程

Posted 一点链科技

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flask-APScheduler使用教程相关的知识,希望对你有一定的参考价值。

项目中需要用到定时器和循环执行。去网上搜了一下,比较常见的有一下集中。运用Python线程执行轮询操作,也有运用Linux系统的Cron,Celery的文章最多,但是太麻烦。看看就知道,Celery 需要一个发送和接受消息的传输者。RabbitMQ 和 Redis 中间人的消息传输支持所有特性,但也提供大量其他实验性方案的支持,包括用 SQLite 进行本地开发。需要用到队列,对于这点需求简直就是大材小用。最后找到了比较合适的Flask-APScheduler。

介绍

看看 github的flask-apscheduler介绍。

  • Loads scheduler configuration from Flask configuration.(支持从Flask中加载调度)

  • Loads job definitions from Flask configuration.(支持从Flask中加载任务配置)

  • Allows to specify the hostname which the scheduler will run on.(允许指定服务器运行任务)

  • Provides a REST API to manage the scheduled jobs.(提供Rest接口管理任务)

  • Provides authentication for the REST API.(提供Rest接口认证)

安装及配置

 
   
   
 
  1. pip install Flask-APScheduler

在Flask配置文件中添加

 
   
   
 
  1. SCHEDULER_API_ENABLED = True

  2. JOBS = [

  3.        {

  4.            'id': 'job_1h_data',

  5.            'func': job_1h_data,

  6.            'args': '',

  7.            'trigger': {

  8.                'type': 'cron',

  9.                'day_of_week': "0-6",

  10.                'hour': '*',

  11.                'minute': '1',

  12.                'second': '0'

  13.            }

  14.        },

  15.        {

  16.            'id': 'job_announce',

  17.            'func': exchange_an,

  18.            'args': '',

  19.            'trigger': 'interval',

  20.            'seconds': 300

  21.        }

  22. ]

上面指定了每一小时获取所有货币24h最高位以及交易所公告。

获取公告

 
   
   
 
  1. def exchange_an():

  2.    """

  3.    :param start_date: 开始时间 YYYY-MM-DD HH:MM:SS

  4.    :param end_date: 结束时间 YYYY-MM-DD HH:MM:SS

  5.    :return: 推送消息,保持数据库

  6.    """

  7.    current_local = time.time()

  8.    start_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(current_local - 300))

  9.    end_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(current_local))

  10.    announce = pro.query('exchange_ann', start_date=start_date, end_date=end_date)

  11.    print('请求交易所公告...')

  12.    for x in announce.values:

  13.        s = {

  14.            'title': x[0],

  15.            'content': x[1],

  16.            'type': x[2],

  17.            'url': x[3],

  18.            'datetime': x[4]

  19.        }

  20.        value = json.dumps(s)

  21.        print(value)

  22.        mqttClient.publish('system/ex_announce', value)

动态添加任务

 
   
   
 
  1. # coding:utf-8

  2. from apscheduler.schedulers.blocking import BlockingScheduler

  3. import datetime

  4. def aps_test(x):

  5.    print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x

  6. scheduler = BlockingScheduler()

  7. scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', second='*/5')

  8. scheduler.add_job(func=aps_test, args=('一次性任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))

  9. scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task')

  10. scheduler.start()

  11. """

  12. 暂停任务

  13. """

  14. scheduler.pause_job('interval_task')

  15. """

  16. 恢复任务

  17. """

  18. scheduler.resume_job('interval_task')

  19. """

  20. 删除任务

  21. """

  22. scheduler.remove_job('interval_task')

apscheduler支持添加三种方式的任务,分别是定时任务,一次性任务及循环任务。同时也包含了对任务的控制。

总结

因为是单机版本,所以指定服务器运行任务,Rest接口管理任务,Rest接口认证就不写了。后续有需求在继续。


以上是关于Flask-APScheduler使用教程的主要内容,如果未能解决你的问题,请参考以下文章

Flask 学习-86.Flask-APScheduler 创建定时任务

Flask-Apscheduler

Flask-APScheduler

VIM 代码片段插件 ultisnips 使用教程

Flask 学习-87.Flask-APScheduler 持久化定时任务保存到mysql数据库

markdown 打字稿...编码说明,提示,作弊,指南,代码片段和教程文章