APScheduler定时任务

Posted aganrun

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了APScheduler定时任务相关的知识,希望对你有一定的参考价值。

参考

https://www.cnblogs.com/huchong/p/9088611.html
https://cloud.tencent.com/developer/article/1172218
https://www.cnblogs.com/huchong/p/9088611.html


目录

1. 安装
2. 简单使用
3. 日志
4. 删除任务
5. 停止恢复任务
6. 意外


1. 安装

pip install apscheduler

2. 简单使用

  1. 5s执行一次aps_test方法
# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime


def aps_test():
    print datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), ‘你好‘


scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, trigger=‘cron‘, second=‘*/5‘)
scheduler.start()

输出为

2020-05-08 11:02:50 你好
2020-05-08 11:02:55 你好
2020-05-08 11:03:00 你好

apscheduler分为4个模块,
分别是 Triggers,Job stores,Executors,Schedulers.
从上面的例子我们就可以看出来了,triggers就是触发器

  1. 几种定时
  • date表示具体的一次性任务
  • interval表示循环任务
  • cron表示定时任务
# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime


def aps_test(x):
    print datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=(‘定时任务‘,), trigger=‘cron‘, second=‘*/5‘)
scheduler.add_job(func=aps_test, args=(‘一次性任务‘,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))
scheduler.add_job(func=aps_test, args=(‘循环任务‘,), trigger=‘interval‘, seconds=3)

scheduler.start()

输出如下

2020-05-08 11:01:24 循环任务
2020-05-08 11:01:25 定时任务
2020-05-08 11:01:27 循环任务
2020-05-08 11:01:30 定时任务
2020-05-08 11:01:30 循环任务
2020-05-08 11:01:33 一次性任务
2020-05-08 11:01:33 循环任务
2020-05-08 11:01:35 定时任务
2020-05-08 11:01:36 循环任务
2020-05-08 11:01:39 循环任务
2020-05-08 11:01:40 定时任务
...

3. 日志

增加日志配置

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format=‘%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s‘,
                    datefmt=‘%Y-%m-%d %H:%M:%S‘,
                    filename=‘log1.txt‘,
                    filemode=‘a‘)


def aps_test(x):
    print 1/0
    print datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=(‘定时任务‘,), trigger=‘cron‘, second=‘*/5‘)
scheduler._logger = logging
scheduler.start()

打印了定时任务遇到的错误

2020-05-08 11:14:52 base.py[line:440] INFO Adding job tentatively -- it will be properly scheduled when the scheduler starts
2020-05-08 11:14:52 base.py[line:881] INFO Added job "aps_test" to job store "default"
2020-05-08 11:14:52 base.py[line:166] INFO Scheduler started
2020-05-08 11:14:55 base.py[line:123] INFO Running job "aps_test (trigger: cron[second=‘*/5‘], next run at: 2020-05-08 11:14:55 CST)" (scheduled at 2020-05-08 11:14:55+08:00)
2020-05-08 11:14:55 base.py[line:131] ERROR Job "aps_test (trigger: cron[second=‘*/5‘], next run at: 2020-05-08 11:15:00 CST)" raised an exception
Traceback (most recent call last):
  File "D:PythonLanguagelibsite-packagesapschedulerexecutorsase.py", line 125, in run_job
    retval = job.func(*job.args, **job.kwargs)
  File "D:/DEV/workspace/V4082/workorder-data/scheduler.py", line 14, in aps_test
    print(1/0)
ZeroDivisionError: division by zero
2020-05-08 11:15:00 base.py[line:123] INFO Running job "aps_test (trigger: cron[second=‘*/5‘], next run at: 2020-05-08 11:15:00 CST)" (scheduled at 2020-05-08 11:15:00+08:00)
2020-05-08 11:15:00 base.py[line:131] ERROR Job "aps_test (trigger: cron[second=‘*/5‘], next run at: 2020-05-08 11:15:05 CST)" raised an exception
Traceback (most recent call last):
  File "D:PythonLanguagelibsite-packagesapschedulerexecutorsase.py", line 125, in run_job
    retval = job.func(*job.args, **job.kwargs)
  File "D:/DEV/workspace/V4082/workorder-data/scheduler.py", line 14, in aps_test
    print(1/0)
ZeroDivisionError: division by zero

4. 删除任务

可以根据id删除任务

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format=‘%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s‘,
                    datefmt=‘%Y-%m-%d %H:%M:%S‘,
                    filename=‘log1.txt‘,
                    filemode=‘a‘)


def aps_test(x):
    print datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x


def aps_date(x):
    scheduler.remove_job(‘interval_task‘)
    print datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x
    

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=(‘定时任务‘,), trigger=‘cron‘, second=‘*/5‘, id=‘cron_task‘)
scheduler.add_job(func=aps_date, args=(‘一次性任务,删除循环任务‘,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id=‘date_task‘)
scheduler.add_job(func=aps_test, args=(‘循环任务‘,), trigger=‘interval‘, seconds=3, id=‘interval_task‘)
scheduler._logger = logging

scheduler.start()
2020-05-08 11:25:19 循环任务
2020-05-08 11:25:20 定时任务
2020-05-08 11:25:22 循环任务
2020-05-08 11:25:25 定时任务
2020-05-08 11:25:25 循环任务
2020-05-08 11:25:28 循环任务
2020-05-08 11:25:28 一次性任务,删除循环任务
2020-05-08 11:25:30 定时任务
2020-05-08 11:25:35 定时任务
2020-05-08 11:25:40 定时任务
2020-05-08 11:25:45 定时任务
2020-05-08 11:25:50 定时任务
2020-05-08 11:25:55 定时任务

5. 停止恢复任务

看看官方文档,还有pause_job,resume_job,用法跟remove_job一样。

6. 意外

任何代码都可能发生意外,关键是,发生意外了,如何第一时间知道。

可以添加一个监听器

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format=‘%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s‘,
                    datefmt=‘%Y-%m-%d %H:%M:%S‘,
                    filename=‘log1.txt‘,
                    filemode=‘a‘)


def aps_test(x):
    print datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x


def date_test(x):
    print datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x
    print 1/0


def my_listener(event):
    if event.exception:
        print ‘任务出错了!!!!!!‘
        # 发送邮件通知
    else:
        print ‘任务照常运行...‘

scheduler = BlockingScheduler()
scheduler.add_job(func=date_test, args=(‘一定性任务,会出错‘,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id=‘date_task‘)
scheduler.add_job(func=aps_test, args=(‘循环任务‘,), trigger=‘interval‘, seconds=3, id=‘interval_task‘)
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler._logger = logging

scheduler.start()
2020-05-08 11:43:27 循环任务
任务照常运行...
2020-05-08 11:43:30 循环任务
任务照常运行...
2020-05-08 11:43:33 循环任务
任务照常运行...
2020-05-08 11:43:36 循环任务
任务照常运行...
2020-05-08 11:43:39 一定性任务,会出错
任务出错了!!!!!!
2020-05-08 11:43:39 循环任务
任务照常运行...
2020-05-08 11:43:42 循环任务
任务照常运行...









以上是关于APScheduler定时任务的主要内容,如果未能解决你的问题,请参考以下文章

Flask 学习-86.Flask-APScheduler 创建定时任务

Django框架中使用定时任务APScheduler

Python下定时任务框架APScheduler的使用

Python定时任务神器 - APScheduler

APScheduler轻量级定时任务框架

apscheduler 定时任务框架