python定时任务模块APScheduler

Posted angelyan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python定时任务模块APScheduler相关的知识,希望对你有一定的参考价值。

一、简单任务

定义一个函数,然后定义一个scheduler类型,添加一个job,然后执行,就可以了

5秒整倍数,就执行这个函数

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime


def aps_test():
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), 你好‘)


scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, trigger=cron, second=*/5)
scheduler.start()

带参数的

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime


def aps_test(x):
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=(你好,), trigger=cron, second=*/5)
scheduler.start()
# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime


def aps_test(x):
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=(定时任务,), trigger=cron, second=*/5)
scheduler.add_job(func=aps_test, args=(一次性任务,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))
scheduler.add_job(func=aps_test, args=(循环任务,), trigger=interval, seconds=3)

scheduler.start()

二、日志

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s,
                    datefmt=%Y-%m-%d %H:%M:%S,
                    filename=log1.txt,
                    filemode=a)


def aps_test(x):
    print 1/0
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=(定时任务,), trigger=cron, second=*/5)
scheduler._logger = logging
scheduler.start()

三、删除任务

要求执行一定阶段任务以后,删除某一个循环任务,其他任务照常进行。有如下代码:

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s,
                    datefmt=%Y-%m-%d %H:%M:%S,
                    filename=log1.txt,
                    filemode=a)


def aps_test(x):
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)


def aps_date(x):
    scheduler.remove_job(interval_task)
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)
    

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=(定时任务,), trigger=cron, second=*/5, id=cron_task)
scheduler.add_job(func=aps_date, args=(一次性任务,删除循环任务,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id=date_task)
scheduler.add_job(func=aps_test, args=(循环任务,), trigger=interval, seconds=3, id=interval_task)
scheduler._logger = logging

scheduler.start()

四、停止任务,恢复任务

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s,
                    datefmt=%Y-%m-%d %H:%M:%S,
                    filename=log1.txt,
                    filemode=a)


def aps_test(x):
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)


def aps_pause(x):
    scheduler.pause_job(interval_task)
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)


def aps_resume(x):
    scheduler.resume_job(interval_task)
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=(定时任务,), trigger=cron, second=*/5, id=cron_task)
scheduler.add_job(func=aps_pause, args=(一次性任务,停止循环任务,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id=pause_task)
scheduler.add_job(func=aps_resume, args=(一次性任务,恢复循环任务,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=24), id=resume_task)
scheduler.add_job(func=aps_test, args=(循环任务,), trigger=interval, seconds=3, id=interval_task)
scheduler._logger = logging

scheduler.start()

五、捕获错误

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s,
                    datefmt=%Y-%m-%d %H:%M:%S,
                    filename=log1.txt,
                    filemode=a)


def aps_test(x):
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)


def date_test(x):
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)
    print (1/0)


def my_listener(event):
    if event.exception:
        print (任务出错了!!!!!!‘)
    else:
        print (任务照常运行...‘)

scheduler = BlockingScheduler()
scheduler.add_job(func=date_test, args=(一定性任务,会出错,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id=date_task)
scheduler.add_job(func=aps_test, args=(循环任务,), trigger=interval, seconds=3, id=interval_task)
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler._logger = logging

scheduler.start()

 

以上是关于python定时任务模块APScheduler的主要内容,如果未能解决你的问题,请参考以下文章

定时任务框架APScheduler学习详解

python 定时任务 from apscheduler.schedulers.blocking import BlockingScheduler

Flask 学习-86.Flask-APScheduler 创建定时任务

Python定时任务神器 - APScheduler

Python下定时任务框架APScheduler的使用

apscheduler 定时任务框架