python定时任务介绍

Posted Wallace JW

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python定时任务介绍相关的知识,希望对你有一定的参考价值。

定时任务

本文介绍几种简单的执行定时任务的方法,包含两种可以重复运行定时任务的方法以及两种只能定时运行一次任务的方法

死循环 + time.sleep() (定时多次运行)

第一种办法简单粗暴。那就是在一个死循环中,使用线程睡眠函数 sleep()。

import time

def dosomething(s):
    print(s)

def timedTask():
    while True:
        dosomething('run')
        time.sleep(10)

if __name__ == '__main__':
    timedTask()
    print('main')		# 这行代码永远不会被运行

这种方法能够执行固定间隔时间的任务。但是由于程序是单线程并且是死循环,因此是阻隔线程, timedTask() 一直占有 CPU 资源,导致后续操作无法执行。不建议使用

使用 apscheduler 库添加调度任务(定时多次运行)

此方法需要先安装 APScheduler 库,这里只讨论定时调度任务,关于APScheduler的更多讲解请参考这篇文章

运行调度任务需要以下三个步骤:

  1. 新建一个 schedulers (调度器) 。
  2. 添加一个调度任务(job stores)。
  3. 运行调度任务。
import time
from apscheduler.schedulers.background import BackgroundScheduler

def timedTask(s):
    print(s)

if __name__ == '__main__':
    # 创建后台执行的 schedulers
    scheduler = BackgroundScheduler()
    # 添加调度任务
    # 调度方法为 timedTask,触发器选择 interval(间隔性),间隔时长为 2 秒
    scheduler.add_job(timedTask, 'interval', args=("run",), seconds=2)
    # 启动调度任务
    scheduler.start()
    while True:
        print(time.time())
        time.sleep(5)

这种方法可以多次执行调度任务,而且不会阻隔线程,后续语句可以正常运行,推荐使用

使用 Timer 类启用非阻隔线程(只定时运行一次)

使用 threading 库的 Timer 类可以设定多长时间以后调用函数,不会影响后面语句的运行,比如下面的代码,会先执行后续语句,到设定的时间再执行 dosomething,但是这种方法的 dosomething 只会输出一次,不能多次运行

from threading import Timer
import time

# 定时任务
def dosomething(s):
    print(s)
    
def timedTask():
    # __init__(self, interval, function, args=None, kwargs=None)
    # 注意args参数必须是 tuple 类型,而且要在输入完所有参数后加一个逗号,
    # 否则可能会报参数数量错误 takes XX positional argument but XX were given
    
    #Timer(1, dosomething, (1,)).start()
    Timer(1, dosomething, ('run',)).start()

if __name__ == '__main__':
    timedTask()
    while True:
        print(time.time())
        time.sleep(5)

使用事件调度器模块 sched (只定时运行一次)

sched 是事件调度器,它通过 scheduler 类来调度事件,从而达到定时执行任务的效果,而且sched 还多了一个优先级参数。

这种方式跟上面说的Timer的区别在于,sched 方式中会先执行完调度事件,再执行后续语句。而 Timer 不会等待定时任务,可以直接执行后续的语句。

scheduler 中的每个调度任务只会工作一次,不会无限循环被调用。如果想重复执行同一任务, 需要重复添加调度任务。

import sched
import time

def timedTask():
    # 初始化 sched 模块的 scheduler 类
    scheduler = sched.scheduler(time.time, time.sleep)
    # 增加调度任务, priority 参数为优先级,数值越小优先级越高
    # enter(self, delay, priority, action, argument=(), kwargs=_sentinel)
    scheduler.enter(10, 2, task, ('run2',))
    scheduler.enter(10, 1, task, ('run1',))
    # 运行任务
    scheduler.run()

# 定时任务
def task(s):
    print(s)

if __name__ == '__main__':
    timedTask()
    while True:
        print(time.time())
        time.sleep(5)

以上是关于python定时任务介绍的主要内容,如果未能解决你的问题,请参考以下文章

Java之旅--定时任务(TimerQuartzSpringLinuxCron)

APScheduler定时任务框架

Python—定时任务

apscheduler 定时任务框架

springboot定时任务但时间间隔不同

Python APScheduler定时任务框架