APS定时任务框架
Posted csp813
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了APS定时任务框架相关的知识,希望对你有一定的参考价值。
一.安装与简介
1.安装
pip install apscheduler
官方文档:https://apscheduler.readthedocs.io/en/latest/#
2.简介
APScheduler基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,
并且可以持久化任务。基于这些功能,我们可以很方便的实现一个python定时任务系统。 触发器(trigger)包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。 作业存储(job store)存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,
并在加载时被反序列化。调度器不能分享同一个作业存储。 执行器(executor)处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。 调度器(scheduler)是其他的组成部分。你通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。
配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。
调度器工作流程:
二.案例
1.hello world
from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime sched = BlockingScheduler() #构造定时器对象 def my_job(): print(f‘{datetime.now():%H:%M:%S} Hello World ‘) sched.add_job(my_job, ‘interval‘, seconds=5) #给定时器添加任务,触发条件,以及间隔时间 sched.start() #开启定时器
解释:
导入调度器模块 BlockingScheduler,这是最简单的调度器,调用 start 方阻塞当前进程,如果你的程序只用于调度,
除了调度进程外没有其他后台进程,那么请用 BlockingScheduler 非常有用,此时调度进程相当于守护进程。 定义一个函数my_job代表我们要调度的作业程序。
实例化一个 BlockingScheduler 类,不带参数表明使用默认的作业存储器-内存,默认的执行器是线程池执行器,最大并发线程数默认为 10 个(另一个是进程池执行器)。
结果:
除了上述添加作业的方法,还可以使用装饰器
@sched.scheduled_job(‘interval‘, seconds=5) def my_job(): print(f‘{datetime.now():%H:%M:%S} Hello World ‘)
如果同一个方法被添加到多个任务重,则需要指定任务 id
@sched.scheduled_job(‘interval‘, id=‘my_job‘, seconds=5) @sched.scheduled_job(‘interval‘, id=‘my_job1‘, seconds=3) def my_job(): print(f‘{datetime.now():%H:%M:%S} Hello World ‘)
2. 带参数的
# coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def aps_test(x): print (datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=(‘你好‘,), trigger=‘cron‘, second=‘*/5‘) scheduler.start()
结果:
三.解析
BlockingScheduler
调度器中的一种,该种表示在进程中只运行调度程序时使用。sched.add_job()
添加作业,并指定调度方式为interval
,时间间隔为 5 秒sched.start()
开始任务
apscheduler分为4个模块,分别是Triggers,Job stores,Executors,Schedulers.从上面的例子我们就可以看出来了,triggers就是触发器,上面的代码中,用了cron,其实还有其他触发器,看看它的源码解释。
The ``trigger`` argument can either be: #. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case any extra keyword arguments to this method are passed on to the trigger‘s constructor #. an instance of a trigger class
源码中解释说,有date, interval, cron可供选择,date表示具体的一次性任务,interval表示循环任务,cron表示定时任务。
# coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def aps_test(x): print (datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=(‘定时任务‘,), trigger=‘cron‘, second=‘*/5‘) scheduler.add_job(func=aps_test, args=(‘一次性任务‘,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12)) scheduler.add_job(func=aps_test, args=(‘循环任务‘,), trigger=‘interval‘, seconds=3) scheduler.start()
结果:
结果非常清晰。除了一次性任务,trigger是不要写的,直接定义next_run_time就可以了,关于date这部分,官网没有解释,但是去看看源码吧,看这行代码
def _create_trigger(self, trigger, trigger_args): if isinstance(trigger, BaseTrigger): return trigger elif trigger is None: trigger = ‘date‘ elif not isinstance(trigger, six.string_types): raise TypeError(‘Expected a trigger instance or string, got %s instead‘ % trigger.__class__.__name__) # Use the scheduler‘s time zone if nothing else is specified trigger_args.setdefault(‘timezone‘, self.timezone) # Instantiate the trigger class return self._create_plugin_instance(‘trigger‘, trigger, trigger_args)
第4行,如果trigger为None,直接定义trigger为‘date‘类型。其实弄到这里,大家应该自己拓展一下,如果实现web的异步任务。假设接到一个移动端任务,任务完成后,发送一个推送到移动端,用date类型的trigger完成可以做的很好。
四.日志
如果代码有意外咋办?会阻断整个任务吗?如果我要计算密集型的任务咋办?下面有个代码,我们看看会发生什么情况。
那就这样一直循环报错????
加上日志我们在看看:
# coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime import logging logging.basicConfig(level=logging.INFO, format=‘%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s‘, datefmt=‘%Y-%m-%d %H:%M:%S‘, filename=‘log1.txt‘, filemode=‘a‘) def aps_test(x): print 1/0 print datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=(‘定时任务‘,), trigger=‘cron‘, second=‘*/5‘) scheduler._logger = logging scheduler.start()
一样的结果,只不过记录到日志了:
四.删除任务
假设我们有个奇葩任务,要求执行一定阶段任务以后,删除某一个循环任务,其他任务照常进行。有如下代码:
#!/usr/bin/env python # -*- coding: utf-8 -*- #author tom # coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime import logging logging.basicConfig(level=logging.INFO, format=‘%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s‘, datefmt=‘%Y-%m-%d %H:%M:%S‘, filename=‘log1.txt‘, filemode=‘a‘) def aps_test(x): print (datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x) def aps_date(x): scheduler.remove_job(‘interval_task‘) print (datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=(‘定时任务‘,), trigger=‘cron‘, second=‘*/5‘, id=‘cron_task‘) scheduler.add_job(func=aps_date, args=(‘一次性任务,删除循环任务‘,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id=‘date_task‘) scheduler.add_job(func=aps_test, args=(‘循环任务‘,), trigger=‘interval‘, seconds=3, id=‘interval_task‘) scheduler._logger = logging scheduler.start()
在运行过程中,成功删除某一个任务,其实就是为每个任务定义一个id,然后remove_job这个id,是不是超级简单,直观?那还有什么呢?
五.停止任务,恢复任务
看看官方文档,还有pause_job, resume_job,用法跟remove_job一样,这边就不详细介绍了,就写个代码。
#!/usr/bin/env python # -*- coding: utf-8 -*- #author tom # coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime import logging logging.basicConfig(level=logging.INFO, format=‘%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s‘, datefmt=‘%Y-%m-%d %H:%M:%S‘, filename=‘log1.txt‘, filemode=‘a‘) def aps_test(x): print (datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x) def aps_pause(x): scheduler.pause_job(‘interval_task‘) print (datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x) def aps_resume(x): scheduler.resume_job(‘interval_task‘) print (datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=(‘定时任务‘,), trigger=‘cron‘, second=‘*/5‘, id=‘cron_task‘) scheduler.add_job(func=aps_pause, args=(‘一次性任务,停止循环任务‘,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id=‘pause_task‘) scheduler.add_job(func=aps_resume, args=(‘一次性任务,恢复循环任务‘,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=24), id=‘resume_task‘) scheduler.add_job(func=aps_test, args=(‘循环任务‘,), trigger=‘interval‘, seconds=3, id=‘interval_task‘) scheduler._logger = logging scheduler.start()
六.意外
任何代码都可能发生意外,关键是,发生意外了,如何第一时间知道,这才是公司最关心的,apscheduler已经为我们想到了这些。
#!/usr/bin/env python # -*- coding: utf-8 -*- #author tom # coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR import datetime import logging logging.basicConfig(level=logging.INFO, format=‘%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s‘, datefmt=‘%Y-%m-%d %H:%M:%S‘, filename=‘log1.txt‘, filemode=‘a‘) def aps_test(x): print (datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x) def date_test(x): print (datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x) print (1/0) def my_listener(event): if event.exception: print (‘任务出错了!!!!!!‘) else: print (‘任务照常运行...‘) scheduler = BlockingScheduler() scheduler.add_job(func=date_test, args=(‘一定性任务,会出错‘,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id=‘date_task‘) scheduler.add_job(func=aps_test, args=(‘循环任务‘,), trigger=‘interval‘, seconds=3, id=‘interval_task‘) scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) scheduler._logger = logging scheduler.start()
是不是很直观,在生产环境中,你可以把出错信息换成发送一封邮件或者发送一个短信,这样定时任务出错就可以立马就知道了。
七.关于调度器的配置(进阶)
关于调度器的知识:
调度器的主循环其实就是反复检查是不是有到时需要执行的任务,分以下几步进行: 询问自己的每一个作业存储器,有没有到期需要执行的任务,如果有,计算这些作业中每个作业需要运行的时间点,如果时间点有多个,做 coalesce 检查。 提交给执行器按时间点运行。 在配置调度器前,我们首先要选取适合我们应用环境场景的调度器,存储器和执行器。下面是各调度器的适用场景: BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回。
BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用start后主线程不会阻塞。
Asyncioscheduler:适用于使用了asyncio模块的应用程序。
GeventScheduler:适用于使用gevent模块的应用程序。
TwistedScheduler:适用于构建Twisted的应用程序。
QtScheduler:适用于构建Qt的应用程序。 上述调度器可以满足我们绝大多数的应用环境,本文以两种调度器为例说明如何进行调度器配置。
关于作业存储器的知识:
作业存储器的选择有两种:
一是内存,也是默认的配置;
二是数据库。具体选哪一种看我们的应用程序在崩溃时是否重启整个应用程序,如果重启整个应用程序,那么作业会被重新添加到调度器中,此时简单的选取内存作为作业存储器即简单又高效。
但是,当调度器重启或应用程序崩溃时您需要您的作业从中断时恢复正常运行,那么通常我们选择将作业存储在数据库中,使用哪种数据库通常取决于为在您的编程环境中使用了什么数据库。
我们可以自由选择,PostgreSQL 是推荐的选择,因为它具有强大的数据完整性保护。
关于执行器的知识:
同样的,执行器的选择也取决于应用场景。通常默认的 ThreadPoolExecutor 已经足够好。如果作业负载涉及CPU 密集型操作,
那么应该考虑使用 ProcessPoolExecutor,甚至可以同时使用这两种执行器,将ProcessPoolExecutor 行器添加为二级执行器。
apscheduler 提供了许多不同的方法来配置调度器。可以使用字典,也可以使用关键字参数传递。首先实例化调度程序,添加作业,然后配置调度器,获得最大的灵活性。
如果调度程序在应用程序的后台运行,选择 BackgroundScheduler,并使用默认的 jobstore 和默认的executor,则以下配置即可:
from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler()
假如我们想配置更多信息:设置两个执行器、两个作业存储器、调整新作业的默认值,并设置不同的时区。下述三个方法是完全等同的。
配置需求:
配置名为“mongo”的MongoDBJobStore作业存储器 配置名为“default”的SQLAlchemyJobStore(使用SQLite) 配置名为“default”的ThreadPoolExecutor,最大线程数为20 配置名为“processpool”的ProcessPoolExecutor,最大进程数为5 UTC作为调度器的时区 coalesce默认情况下关闭 作业的默认最大运行实例限制为3 misfire_grace_time 配置超时,如果设置为 10 秒,则表示误差在 10 秒内仍允许运行,否则会报 job was missed by ..。
调度器的配置
方法一:
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExec utor jobstores = { ‘mongo‘: MongoDBJobStore(), ‘default‘: SQLAlchemyJobStore(url=‘sqlite:///jobs.sqlite‘) } executors = { ‘default‘: ThreadPoolExecutor(20), ‘processpool‘: ProcessPoolExecutor(5) } job_defaults = { ‘coalesce‘: False, ‘max_instances‘: 3, ‘misfire_grace_time‘:10 #10秒的任务超时容错 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
方法二:
from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler({ ‘apscheduler.jobstores.mongo‘: { ‘type‘: ‘mongodb‘ }, ‘apscheduler.jobstores.default‘: { ‘type‘: ‘sqlalchemy‘, ‘url‘: ‘sqlite:///jobs.sqlite‘ }, ‘apscheduler.executors.default‘: { ‘class‘: ‘apscheduler.executors.pool:ThreadPoolExecutor‘, ‘max_workers‘: ‘20‘ }, ‘apscheduler.executors.processpool‘: { ‘type‘: ‘processpool‘, ‘max_workers‘: ‘5‘ }, ‘apscheduler.job_defaults.coalesce‘: ‘false‘, ‘apscheduler.job_defaults.max_instances‘: ‘3‘, ‘apscheduler.timezone‘: ‘UTC‘, })
方法三:
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ProcessPoolExecutor jobstores = { ‘mongo‘: { ‘type‘: ‘mongodb‘ }, ‘default‘: SQLAlchemyJobStore(url=‘sqlite:///jobs.sqlite‘) } executors = { ‘default‘: { ‘type‘: ‘threadpool‘, ‘max_workers‘: 20 }, ‘processpool‘: ProcessPoolExecutor(max_workers=5) } job_defaults = { ‘coalesce‘: False, ‘max_instances‘: 3, ‘misfire_grace_time‘: 10 # 10秒的任务超时容错 } scheduler = BackgroundScheduler() # .. do something else here, maybe add jobs etc.
以上涵盖了大多数情况的调度器配置,在实际运行时可以试试不同的配置会有怎样不同的效果。
启动调度器
启动调度器前需要先添加作业,有两种方法向调度器添加作业:
一是通过接口add_job(),
二是通过使用函数装饰器,其中 add_job() 返回一个apscheduler.job.Job类的实例,
用于后续修改或删除作业。
我们可以随时在调度器上调度作业。如果在添加作业时,调度器还没有启动,那么任务将不会运行,并且第一次运行时间在调度器启动时计算。 注意:如果使用的是序列化作业的执行器或作业存储器,那么要求被调用的作业(函数)必须是全局可访问的,被调用的作业的参数是可序列化的,
作业存储器中,只有 MemoryJobStore 不会序列化作业。执行器中,只有ProcessPoolExecutor 将序列化作业。
启用调度器只需要调用调度器的 start() 方法,下面分别使用不同的作业存储器来举例说明:
方法一:使用默认的作业存储器:
#!/usr/bin/env python # -*- coding: utf-8 -*- #author tom from apscheduler.schedulers.blocking import BlockingScheduler import datetime from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor def my_job(id=‘my_job‘): print(id, ‘-->‘, datetime.datetime.now()) jobstores = { ‘default‘: MemoryJobStore() } executors = { ‘default‘: ThreadPoolExecutor(20), ‘processpool‘: ProcessPoolExecutor(10) } job_defaults = { ‘coalesce‘: False, ‘max_instances‘: 3 } scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults) scheduler.add_job(my_job, args=[‘job_interval‘, ], id=‘job_interval‘, trigger=‘interval‘, seconds=5, replace_existing=True) scheduler.add_job(my_job, args=[‘job_cron‘, ], id=‘job_cron‘, trigger=‘cron‘, month=‘4-8,11-12‘, hour=‘7-23‘, second=‘*/10‘, end_date = ‘2019-11-30‘) scheduler.add_job(my_job, args=[‘job_once_now‘, ], id=‘job_once_now‘) scheduler.add_job(my_job, args=[‘job_date_once‘, ], id=‘job_date_once‘, trigger=‘date‘, run_date=‘2019-08-13 23:28:05‘) try: scheduler.start() except SystemExit: print(‘exit‘) exit()
运行结果:
job_once_now --> 2019-08-13 23:26:24.935198 job_interval --> 2019-08-13 23:26:29.928484 job_cron --> 2019-08-13 23:26:30.000488 job_interval --> 2019-08-13 23:26:34.928770 job_interval --> 2019-08-13 23:26:39.935056 job_cron --> 2019-08-13 23:26:40.012061 job_interval --> 2019-08-13 23:26:44.941342 job_interval --> 2019-08-13 23:26:49.936628 job_cron --> 2019-08-13 23:26:50.009632 job_interval --> 2019-08-13 23:26:54.940914
上述代码使用内存作为作业存储器,操作比较简单,重启程序相当于第一次运行。
方法二:使用数据库作为存储器:
from apscheduler.schedulers.blocking import BlockingScheduler import datetime from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore def my_job(id=‘my_job‘): print(id, ‘-->‘, datetime.datetime.now()) jobstores = { ‘default‘: SQLAlchemyJobStore(url=‘sqlite:///jobs.sqlite‘) } executors = { ‘default‘: ThreadPoolExecutor(20), ‘processpool‘: ProcessPoolExecutor(10) } job_defaults = { ‘coalesce‘: False, ‘max_instances‘: 3 } scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults) scheduler.add_job(my_job, args=[‘job_interval‘, ], id=‘job_interval‘, trigger=‘interval‘, seconds=5, replace_existing=True) scheduler.add_job(my_job, args=[‘job_cron‘, ], id=‘job_cron‘, trigger=‘cron‘, month=‘4-8,11-12‘, hour=‘7-23‘, second=‘*/10‘, end_date = ‘2019-11-30‘) scheduler.add_job(my_job, args=[‘job_once_now‘, ], id=‘job_once_now‘) scheduler.add_job(my_job, args=[‘job_date_once‘, ], id=‘job_date_once‘, trigger=‘date‘, run_date=‘2019-08-13 23:30:05‘) try: scheduler.start() except SystemExit: print(‘exit‘) exit()
说明,红色代码修改为数据库作为作业存储器
运行结果如下:
Run time of job "my_job (trigger: date[2019-08-13 23:35:05 CST], next run at: 2019-08-13 23:35:05 CST)" was missed by 0:06:38.154717 job_once_now --> 2019-08-13 23:41:43.172718 job_interval --> 2019-08-13 23:41:48.067998 job_cron --> 2019-08-13 23:41:50.028110 job_interval --> 2019-08-13 23:41:53.105287 job_interval --> 2019-08-13 23:41:58.046569 job_cron --> 2019-08-13 23:42:00.031683 job_interval --> 2019-08-13 23:42:03.102858 job_interval --> 2019-08-13 23:42:08.055142 job_cron --> 2019-08-13 23:42:10.046255 job_interval --> 2019-08-13 23:42:13.102430 job_interval --> 2019-08-13 23:42:18.071715 job_cron --> 2019-08-13 23:42:20.084830 job_interval --> 2019-08-13 23:42:23.108003
提示:我们有作业本应在2019-08-13 23:30:05运行的作业没有运行,因为现在的时间为2019-08-13 23:41,错过了 23:30:05 的时间。
如果将上术代码第 21-25 行注释掉,重新运行本程序,则四种类型的作业仍会运行,结果如下:
运行结果:
Run time of job "my_job (trigger: cron[month=‘4-8,11-12‘, hour=‘7-23‘, second=‘*/10‘], next run at: 2019-08-13 23:45:50 CST)" was missed by 0:00:18.783910
Run time of job "my_job (trigger: cron[month=‘4-8,11-12‘, hour=‘7-23‘, second=‘*/10‘], next run at: 2019-08-13 23:46:10 CST)" was missed by 0:00:08.784911
Run time of job "my_job (trigger: interval[0:00:05], next run at: 2019-08-13 23:45:53 CST)" was missed by 0:00:15.774201
Run time of job "my_job (trigger: interval[0:00:05], next run at: 2019-08-13 23:45:53 CST)" was missed by 0:00:10.774201
Run time of job "my_job (trigger: interval[0:00:05], next run at: 2019-08-13 23:45:53 CST)" was missed by 0:00:05.774201
job_interval --> 2019-08-13 23:46:08.798911
job_cron --> 2019-08-13 23:46:10.056983
job_interval --> 2019-08-13 23:46:13.187162
job_interval --> 2019-08-13 23:46:18.102443
job_cron --> 2019-08-13 23:46:20.067556
job_interval --> 2019-08-13 23:46:23.097729
job_interval --> 2019-08-13 23:46:28.094015
job_cron --> 2019-08-13 23:46:30.068128
job_interval --> 2019-08-13 23:46:33.101301
作业仍会运行,说明作业被添加到数据库中,程序中断后重新运行时会自动从数据库读取作业信息,而不需要重新再添加到调度器中,如果不注释 红色添加作业的代码,则作业会重新添加到数据库中,这样就有了两个同样的作业,避免出现这种情况可以在 add_job 的参数中增加 replace_existing=True,如
scheduler.add_job(my_job, args=[‘job_interval‘,],id=‘job_interval‘,trigger=‘interval‘,seconds=3,replace_existing=True)
如果我们想运行错过运行的作业,使用 misfire_grace_time,如
scheduler.add_job(my_job,args = [‘job_cron‘,] ,id=‘job_cron‘,trigger=‘cron‘,month=‘4-8,11-12‘,hour=‘7-11‘,second=‘*/15‘,coalesce=True,
misfire_grace_time=30,replace_existing=True,end_date=‘2018-05-30‘)
说明:misfire_grace_time,假如一个作业本来 08:00 有一次执行,但是由于某种原因没有被调度上,现在 08:01 了,这个 08:00 的运行实例被提交时,会检查它预订运行的时间和当下时间的差值(这里是1分钟),大于我们设置的 30 秒限制,那么这个运行实例不会被执行。最常见的情形是 scheduler 被 shutdown 后重启,某个任务会积攒了好几次没执行如 5 次,下次这个作业被提交给执行器时,执行 5 次。设置 coalesce=True 后,只会执行一次。
其他操作如下:
scheduler.remove_job(job_id,jobstore=None)#删除作业 scheduler.remove_all_jobs(jobstore=None)#删除所有作业 scheduler.pause_job(job_id,jobstore=None)#暂停作业 scheduler.resume_job(job_id,jobstore=None)#恢复作业 scheduler.modify_job(job_id, jobstore=None, **changes)#修改单个作业属性信息 scheduler.reschedule_job(job_id, jobstore=None, trigger=None,**trigger_args)#修改单个作业的触发器并更新下次运行时间 scheduler.print_jobs(jobstore=None, out=sys.stdout)#输出作业信息
调度器事件监听:
scheduler 的基本应用,在前面已经介绍过了,但仔细思考一下:如果程序有异常抛出会影响整个调度任务吗?请看下面的代码,运行一下看看会发生什么情况:
# coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def aps_test(x): print (1/0) print (datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=(‘定时任务‘,), trigger=‘cron‘, second=‘*/5‘) scheduler.start()
运行结果如下:
Job "aps_test (trigger: cron[second=‘*/5‘], next run at: 2019-08-13 23:55:45 CST)" raised an exception Traceback (most recent call last): File "C:UsersAdministratorEnvsmmachine_learnlibsite-packagesapschedulerexecutorsase.py", line 125, in run_job retval = job.func(*job.args, **job.kwargs) File "F:/machine learnning/machine_learning/myCeleryProj/apschedule_test.py", line 84, in aps_test print (1/0) ZeroDivisionError: division by zero Job "aps_test (trigger: cron[second=‘*/5‘], next run at: 2019-08-13 23:55:50 CST)" raised an exception Traceback (most recent call last):
可能看出每 5 秒抛出一次报错信息。任何代码都可能抛出异常,关键是,发生导常事件,如何第一时间知道,这才是我们最关心的,apscheduler 已经为我们想到了这些,提供了事件监听来解决这一问题。
将上述代码稍做调整,加入日志记录和事件监听,如下所示。
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR import datetime import logging logging.basicConfig(level=logging.INFO, format=‘%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s‘, datefmt=‘%Y-%m-%d %H:%M:%S‘, filename=‘log1.txt‘, filemode=‘a‘) def aps_test(x): print (datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x) def date_test(x): print (datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘), x) print (1/0) def my_listener(event): if event.exception: print (‘任务出错了!!!!!!‘) else: print (‘任务照常运行...‘) scheduler = BlockingScheduler() scheduler.add_job(func=date_test, args=(‘一定性任务,会出错‘,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id=‘date_task‘) scheduler.add_job(func=aps_test, args=(‘循环任务‘,), trigger=‘interval‘, seconds=3, id=‘interval_task‘) scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) scheduler._logger = logging scheduler.start()
以上是关于APS定时任务框架的主要内容,如果未能解决你的问题,请参考以下文章