python使用apscheduler实现定时任务

Posted 青灯丶不归客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python使用apscheduler实现定时任务相关的知识,希望对你有一定的参考价值。

安装apscheduler

pip install apscheduler

基本概念

1、触发器triggers。 触发器包含调度逻辑。每个作业都有自己的触发器,用于确定下一个任务何时运行。触发器有三种内建的trigger:

  1. data: 特定的时间触发
  2. interval: 固定的时间间隔触发
  3. cron: 在特定时间周期性地触发

2、 任务存储器 job stores。 用于存放任务,把任务存放在内存(为默认MemoryJobStore)或数据库中

3、执行器 executors。 执行器是将任务提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件

4、调度器scheduler。 把上方三个组件作为参数,通过创建调度器实例来运行

根据开发需求选择对应的组件

BlockingScheduler            阻塞式调度器:适用于只跑调度器的程序。
BackgroundScheduler      后台调度器:适用于非阻塞的情况,调度器会在后台独立运行。
AsyncIOScheduler            AsyncIO调度器,适用于应用使用AsnycIO的情况。
GeventScheduler              Gevent调度器,适用于应用通过Gevent的情况。
TornadoScheduler            Tornado调度器,适用于构建Tornado应用。
TwistedScheduler             Twisted调度器,适用于构建Twisted应用。
QtScheduler                      Qt调度器,适用于构建Qt应用。

使用步骤

1、新建一个调度器schedulers
2、添加调度任务
3、运行调度任务

使用示例

触发器date:特点的时间点触发,只执行一次

from datetime import datetime
from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler

def job(text):    
    print(text)

scheduler = BlockingScheduler()
# 在 2019-8-30 运行一次 job 方法
scheduler.add_job(job, \'date\', run_date=date(2019, 8, 30), args=[\'text1\'])
# 在 2019-8-30 01:00:00 运行一次 job 方法
scheduler.add_job(job, \'date\', run_date=datetime(2019, 8, 30, 1, 0, 0), args=[\'text2\'])
# 在 2019-8-30 01:00:01 运行一次 job 方法
scheduler.add_job(job, \'date\', run_date=\'2019-8-30 01:00:00\', args=[\'text3\'])

scheduler.start()

触发器interval:固定时间间隔触发。

参数 说明
weeks(int) 间隔几周
days(int) 间隔几天
hours(int) 间隔几小时
minutes(int) 间隔几分钟
seconds(int) 间隔多少秒
start_date(datetime或str) 开始日期
end_date(datetime或str) 结束日期
timezone(datetime.tzinfo 或str)
from apscheduler.schedulers.blocking import BlockingScheduler


def run(text):    
    print(text)


if __name__ == \'__main__\':
    scheduler = BlockingScheduler()
    # 每隔 1分钟 运行一次 job 方法
    scheduler.add_job(run, \'interval\', minutes=1, args=[\'job1\'])
    # 在 2019-08-29 22:15:00至2019-08-29 22:17:00期间,每隔1分30秒 运行一次 job 方法
    scheduler.add_job(run, \'interval\', minutes=1, seconds=30, start_date=\'2019-08-29 22:15:00\', end_date=\'2019-08-29 22:17:00\', args=[\'job2\'])
    scheduler.start()

触发器cron: 在特定的时间周期性地触发。

参数 说明
year (int 或 str) 年,4位数字
month (int 或 str) 月 (范围1-12)
day (int 或 str) 日 (范围1-31)
week (int 或 str) 周 (范围1-53)
day_of_week (int 或 str) 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
hour (int 或 str) 时 (范围0-23)
minute (int 或 str) 分 (范围0-59)
second (int 或 str) 秒 (范围0-59)
start_date (datetime 或 str) 最早开始日期(包含)
end_date (datetime 或 str) 最晚结束时间(包含)
timezone (datetime.tzinfo 或str) 指定时区
from apscheduler.schedulers.blocking import BlockingScheduler


def exec_job(text):
    print(text)
    

if __name__ == \'__main__\':
    scheduler = BlockingScheduler()
    # 在每天的22点,每隔一分钟 运行一次exec_job方法
    scheduler.add_job(exec_job, \'cron\', hour=22, minute=\'*/1\', args=[\'job_one\'])
    # 在每天的22点和23点的25分, 运行一次exec_job方法
    scheduler.add_job(exec_job, \'cron\', hour=\'22-23\', minute=\'25\', args=[\'result\'])
    scheduler.start()

添加任务

添加任务的方法有两种
(1)、通过add_job()添加任务
(2)、通过装饰器scheduled_job()

from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler


scheduler = BlockingScheduler()


def tick(message=\'working\'):
    print(\'Tick! The time is: %s\' % datetime.now())
    print("worker status is:", message)


def tick_other(status="relaxing"):
    print(\'Tick! The time is: %s\' % datetime.now())
    print("worker status is:", status)


# 添加任务的方法有两种 1、通过调用add_job(), 2、通过装饰器scheduled_job()
# add_job()方法返回一个apscheduler.job.Job实例,可以使用该实例稍后修改或删除该任务
@scheduler.scheduled_job(\'interval\', seconds=5)
def tick_another(status="restart"):
    print(\'Tick! The time is: %s\' % datetime.now())
    print("worker status is:", status)


if __name__ == \'__main__\':
    # 在指定期间内, 每隔1分10秒运行一次tick方法
    scheduler.add_job(tick, \'interval\', minutes=1, seconds=10,
                      start_date=\'2020-01-06 14:19:00\', end_date=\'2020-01-06 14:24:00\', args=[\'sleep\'])
    # 在每天的11点25分,运行一次tick_other方法
    scheduler.add_job(tick_other, \'cron\', hour=\'11\', minute=\'25\', args=[\'hard working\'])
    scheduler.start()

参考来源

在线文档:在线文档
转载来源:文档

以上是关于python使用apscheduler实现定时任务的主要内容,如果未能解决你的问题,请参考以下文章

python用schedule模块实现定时任务

python实现定时发送系列

Python下APScheduler的简单使用

Python3 - 时间处理与定时任务

Python 定时调度

python 定时任务 from apscheduler.schedulers.blocking import BlockingScheduler