python中APScheduler的使用详解(python3经典编程案例)

Posted cui_yonghua

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python中APScheduler的使用详解(python3经典编程案例)相关的知识,希望对你有一定的参考价值。

一. 介绍及基本案例

APScheduler用起来十分方便,提供了基于日期,时间间隔及crontab类型的任务。为我们提供了构建专用调度器或者调度服务的基础模块。

APScheduler不是一个守护进程或服务,它自身不带有任何命令行工具。它主要在现有的程序中运行,

安装:pip3 install apscheduler

一个简单的任务间隔实例

# -*- coding: utf-8 -*-
import os
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler


def tick():
    print('Tick! The time is: %s' % datetime.now())


if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.add_job(tick, 'interval', seconds=3)
    print('Press Ctrl+0 to exit'.format('Break' if os.name == 'nt' else 'C    '))

    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass


二. cron类的定时任务

文档地址:https://apscheduler.readthedocs.io/en/latest/modules/triggers/cron.html

Parameters:
year (int|str) – 4-digit year
month (int|str) – month (1-12)
day (int|str) – day of month (1-31)
week (int|str) – ISO week (1-53)
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – hour (0-23)
minute (int|str) – minute (0-59)
second (int|str) – second (0-59)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
jitter (int|None) – delay the job execution by jitter seconds at most

# -*- coding: utf-8 -*-

from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler


def tick():
    print('Tick! The time is: %s' % datetime.now())


if __name__ == '__main__':
    scheduler = BlockingScheduler()
    # 每天8点40执行任务
    scheduler.add_job(tick, 'cron', hour=18, minute=40)
    print('Press Ctrl+0 to exit'.format('Break' if os.name == 'nt' else 'C    '))

    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass

任务定时参考:

minute = '*/3'  # 表示每5分钟执行一次
hour = '19-21', minute = '23'  # 表示19:23, 20:23, 21:23 各执行一次任务

三. 调度器事件监听

APScheduler监听代码异常并记录到日志文件:

# -*- coding: utf-8 -*-
# File Name: listen_event.py

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='log1.txt',
                    filemode='a')


def aps_test(x):
    print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)


def date_test(x):
    print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
    print(1/0)


def my_listener(event):
    if event.exception:
        print('任务出错了!!!!!!')
    else:
        print('任务照常运行...')


scheduler = BlockingScheduler()
scheduler.add_job(func=date_test, args=('一次性任务,会出错',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task')
scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler._logger = logging
scheduler.start() 

四. 配置调度器并使用

4.1 使用默认的作业存储器

# -*- coding: utf-8 -*-

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


def my_job(id='my_job'):
    print(id,'-->',datetime.datetime.now())
jobstores = 
    'default': MemoryJobStore()


executors = 
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(10)

job_defaults = 
    'coalesce': False,
    'max_instances': 3

scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval', seconds=5,replace_existing=True)
scheduler.add_job(my_job, args=['job_cron',],id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11', second='*/10',\\
                  end_date='2020-05-30')
scheduler.add_job(my_job, args=['job_once_now',],id='job_once_now')
scheduler.add_job(my_job, args=['job_date_once',],id='job_date_once',trigger='date',run_date='2018-04-05 07:48:05')
try:
    scheduler.start()
except SystemExit:
    print('exit')
    exit()


4.2 使用数据库作为作业存储器

# -*- coding: utf-8 -*-

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
def my_job(id='my_job'):
    print (id,'-->',datetime.datetime.now())
jobstores = 
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')

executors = 
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(10)

job_defaults = 
    'coalesce': False,
    'max_instances': 3

scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval', seconds=5,replace_existing=True)
scheduler.add_job(my_job, args=['job_cron',],id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11', second='*/10',\\
                  end_date='2018-05-30')
scheduler.add_job(my_job, args=['job_once_now',],id='job_once_now')
scheduler.add_job(my_job, args=['job_date_once',],id='job_date_once',trigger='date',run_date='2018-04-05 07:48:05')
try:
    scheduler.start()
except SystemExit:
    print('exit')
    exit()  


以上是关于python中APScheduler的使用详解(python3经典编程案例)的主要内容,如果未能解决你的问题,请参考以下文章

python中APScheduler的使用详解(python3经典编程案例)

django定时任务python调度框架APScheduler使用详解

python调度框架APScheduler使用详解

定时任务框架APScheduler学习详解

使用 APScheduler 在 python 中进行并行编程的最佳方法是啥?

定时任务框架APScheduler学习详解