Python定时任务神器 - APScheduler

Posted 上帝De助手

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python定时任务神器 - APScheduler相关的知识,希望对你有一定的参考价值。

文章目录

Python定时任务神器 - APScheduler

定时任务在很多的开发场景中都会使用到,在Python中也提供很多的定时任务库。比如:

  • sched
  • schedule
  • celery

但是这些定时任务库都只是提供了简答的,或者只支持静态的定时任务。而对于需要复杂定时功能,或者动态注册定时任务的场景,则无法满足。

而今天介绍的主角 - APScheduler,则会完美的解决这个问题。

  • 多种的定时任务类型支持
  • 静态、动态定时任务支持

简单说明

不管你使用哪种APScheduler的定时任务,你都需要先了解APScheduler的简单机制。即:job、executors、jobstores、trigger、scheduler等

job

即需要被执行的具体任务,主要对应Python中的函数或方法。在APScheduler中即可提前配置,也可以动态添加job。

executors

即执行job的对象。通常可以是多线程、多进程、协程等对象。

jobstores

即存储job元数据的地方。可以是memory、sqllite、mysql等。

trigger

即决定任务的触发模式。通常有指定时间、指定时间间隔、指定周期策略等。

scheduler

用于调度和管理上述提到的所有对象。

任意一个APScheduler的实例启动的时候都需要配置这些初始参数,如果没有指定则会使用默认的值。

使用方式

首先你得安装apscheduler,方式如下:

pip install apscheduler

静态配置任务

import time
from apscheduler.schedulers.blocking import BlockingScheduler
 
sched = BlockingScheduler()
 
@sched.scheduled_job('interval', seconds=5)     # 以5s为间隔来触发
def my_job():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
 
sched.start()

动态添加任务

import time
from apscheduler.schedulers.blocking import BlockingScheduler
 
def my_job():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
 
sched = BlockingScheduler()
# 在11,12月份的第三个星期五的00:00,01:00,02:00,03:00触发
sched.add_job(my_job, 'cron', month='11-12', day='3rd fri', hour='0-3')

sched.start()

异步任务

上面的任务都是阻塞任务,即任务执行过程中需要持续等待结果;如果你不希望等待结果的话,则可以使用非阻塞的异步任务。

import time
import datetime
from pytz import timezone
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor

def my_job(name):
    print('%s: %s' % (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), name))
    
jobstores = 
    'default': SQLAlchemyJobStore(url='sqlite:tmp/test.db')

executors = 
    'default': ThreadPoolExecutor(20)

job_defaults = 
    'coalesce': False,
    'max_instances': 3

task_scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults,
                                     timezone=timezone('Asia/Shanghai'))
task_scheduler.start()
# 指定特定的日期触发任务,这类任务只会触发一次
job = task_scheduler.add_job(my_job, 'date', run_date=datetime(2019, 3, 6, 9, 47, 5), args=('date',))

其它使用API

除了注册和添加任务之外,apscheduler还提供了其它比较友好的API接口。比如:

  • 暂停任务
  • 启动任务
  • 删除任务
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.pause()
job.resume()
job.remove()

获取完整代码请查看原文

新书推荐

获取更多关于Python和自动化测试的文章,请扫描如下二维码!

以上是关于Python定时任务神器 - APScheduler的主要内容,如果未能解决你的问题,请参考以下文章

sched 模块中巨好用的轻量级定时任务神器scheduler!

太实用了,Schedule模块, Python 周期任务神器

Linux日志管理神器之Logrotate日志分割

python制作电脑定时关机办公神器,另含其它两种方式,无需编程!

Python爬虫 | 微博热搜定时爬取,堪称摸鱼神器

Python爬虫 | 微博热搜定时爬取,堪称摸鱼神器