Django框架中使用定时任务APScheduler
Posted 以梦为马&不负韶华
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Django框架中使用定时任务APScheduler相关的知识,希望对你有一定的参考价值。
文章目录
前言
Django框架中使用定时任务APScheduler:django-apschedule
本文需要已掌握Python定时任务框架APSchedule,如果不会,可以查看我另一篇博客Python定时任务框架APScheduler快速入门,快速学习。
一、安装模块
pip install django-apscheduler
二、配置
1.修改settings.py文件
在INSTALLED_APPS
中加入django-apscheduler
应用:
INSTALLED_APPS = [
......
'django_apscheduler',#定时执行任务
]
2.执行迁移命令
在django项目的manage同级终端下:
python manage.py migrate
会自动生成两张表:django_apschedule_djangojob(存储任务)和django_apschedule_djangojobexecution(任务执行情况)
django_apschedule_djangojob如下:
django_apschedule_djangojobexecution如下:
status: 执行状态
duration: 执行了多长时间
exception: 是否出现了什么异常
二、创建任务
两种创建任务的方法:装饰器和add_job函数。添加完成后,数据库中就可以显示
1.前提必备
必须在总路由中导入对应分路由 对应分路由可以为空
总路由:
from django.conf.urls import url, include
from django.contrib import admin
urlpatterns = [
url(r'^admin/', admin.site.urls),
# url(r'', include('cron.urls')),
]
对应分路由
urlpatterns = [
]
2.装饰器创建任务—适合于写代码的人自己创建任务
在任意view.py中实现代码
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
#开启定时工作
try:
# 实例化调度器
scheduler = BackgroundScheduler()
# 调度器使用DjangoJobStore()
scheduler.add_jobstore(DjangoJobStore(), "default")
# 设置定时任务,选择方式为interval,时间间隔为10s
# 另一种方式为每天固定时间执行任务,对应代码为:
# @register_job(scheduler, 'cron', day_of_week='mon-fri', hour='9', minute='30', second='10',id='task_time')
@register_job(scheduler,"interval", seconds=10)
def my_job():
# 这里写你要执行的任务
pass
register_events(scheduler)
scheduler.start()
except Exception as e:
print(e)
# 有错误就停止定时器
scheduler.shutdown()
3.add_job函数创建任务—适合用户通过页面输入参数,并提交来手动创建定时任务
如果想让用户通过页面输入参数,并提交来手动创建定时任务,就需要使用add_job函数。
下面这个小例子,接收json数据给后端,触发insert_cron_job函数,来添加任务:
import json
from django.http import JsonResponse
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), 'default')
# cron
def insert_cron_job(json): # 传入 状态策略 开始任务 结束任务
# 接收参数
try:
# 获取json中对应数据
# add_job添加任务,传入:my_job为要执行的定时任务,trigger指定任务触发器(triggers)—不同定时模式,args传参,id定时任务命名–id,等其他配置,根据trigger不同,传入不同的 时间格式进行触发
scheduler.add_job(my_job, trigger="", args=["s", ], id=start_job_id)
except Exception as e:
print("获取数据异常")
return
# 具体要执行的代码
def my_job(s):
pass
register_events(scheduler)
scheduler.start()
三、删除任务
删除完成后,可以查看数据库中对应的任务id已经删除完成
1.remove_job示例
DjangoJobStore.remove_job(DjangoJobStore(), job_id=需要删除的job_id)
import json
from django.http import JsonResponse
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), 'default')
# cron
def insert_cron_job(json): # 传入 状态策略 开始任务 结束任务
# 接收参数
try:
# 获取json中对应数据
# add_job添加任务,传入:my_job为要执行的定时任务,trigger指定任务触发器(triggers)—不同定时模式,args传参,id定时任务命名–id,等其他配置,根据trigger不同,传入不同的 时间格式进行触发
scheduler.add_job(my_job, trigger="", args=["s", ], id=start_job_id)
except Exception as e:
print("获取数据异常")
return
def delete_cron_job(json):
# 接收参数
# 获取需要删除的job_id
try:
DjangoJobStore.remove_job(DjangoJobStore(), job_id=需要删除的job_id)
except Exception as e:
print("删除定时任务异常:", e)
# 具体要执行的代码
def my_job(s):
pass
register_events(scheduler)
scheduler.start()
2.DjangoJobStore源码分析—remove_job、get_all_jobs、add_job、update_job等
使用django_apscheduler操作DjangoJobStore这个类来进行各项操作
点开DjangoJobStore源码如下:
找到删除操作的源码:
可以看到是操作数据库,使用的DjangoJob这个模型类,有了这个我们可以进一步分析,也可以直接操作这个类完成对应操作
对应操作即可
其他还有get_all_jobs、add_job、update_job等,需要什么操作,对应即可
DjangoJob模型类如下:
from datetime import timedelta, datetime
from django.db import models, transaction
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
import logging
from django_apscheduler import util
from django_apscheduler.util import get_django_internal_datetime
logger = logging.getLogger(__name__)
class DjangoJob(models.Model):
id = models.CharField(
max_length=255, primary_key=True, help_text=_("Unique id for this job.")
)
next_run_time = models.DateTimeField(
db_index=True,
blank=True,
null=True,
help_text=_(
"Date and time at which this job is scheduled to be executed next."
),
)
job_state = models.BinaryField()
def __str__(self):
status = (
f"next run at: {util.get_local_dt_format(self.next_run_time)}"
if self.next_run_time
else "paused"
)
return f"{self.id} ({status})"
class Meta:
ordering = ("next_run_time",)
class DjangoJobExecutionManager(models.Manager):
def delete_old_job_executions(self, max_age: int):
"""
Delete old job executions from the database.
:param max_age: The maximum age (in seconds). Executions that are older
than this will be deleted.
"""
self.filter(run_time__lte=timezone.now() - timedelta(seconds=max_age)).delete()
class DjangoJobExecution(models.Model):
SENT = "Started execution"
SUCCESS = "Executed"
MISSED = "Missed!"
MAX_INSTANCES = "Max instances!"
ERROR = "Error!"
STATUS_CHOICES = [(x, x) for x in [SENT, ERROR, SUCCESS,]]
id = models.BigAutoField(
primary_key=True, help_text=_("Unique ID for this job execution.")
)
job = models.ForeignKey(
DjangoJob,
on_delete=models.CASCADE,
help_text=_("The job that this execution relates to."),
)
status = models.CharField(
max_length=50,
# TODO: Replace this with enumeration types when we drop support for Django 2.2
# See: https://docs.djangoproject.com/en/dev/ref/models/fields/#field-choices-enum-types
choices=STATUS_CHOICES,
help_text=_("The current status of this job execution."),
)
run_time = models.DateTimeField(
db_index=True, help_text=_("Date and time at which this job was executed."),
)
# We store this value in the DB even though it can be calculated as `finished - run_time`. This allows quick
# calculation of average durations directly in the database later on.
duration = models.DecimalField(
max_digits=15,
decimal_places=2,
default=None,
null=True,
help_text=_("Total run time of this job (in seconds)."),
)
finished = models.DecimalField(
max_digits=15,
decimal_places=2,
default=None,
null=True,
help_text=_("Timestamp at which this job was finished."),
)
exception = models.CharField(
max_length=1000,
null=True,
help_text=_(
"Details of exception that occurred during job execution (if any)."
),
)
traceback = models.TextField(
null=True,
help_text=_(
"Traceback of exception that occurred during job execution (if any)."
),
)
objects = DjangoJobExecutionManager()
@classmethod
def atomic_update_or_create(
cls,
lock,
job_id: str,
run_time: datetime,
status: str,
exception: str = None,
traceback: str = None,
) -> "DjangoJobExecution":
"""
Uses an APScheduler lock to ensures that only one database entry can be created / updated at a time.
This keeps django_apscheduler in sync with APScheduler and maintains a 1:1 mapping between APScheduler events
that are triggered and the corresponding DjangoJobExecution model instances that are persisted to the database.
:param lock: The lock to use when updating the database - probably obtained by calling _scheduler._create_lock()
:param job_id: The ID to the APScheduler job that this job execution is for.
:param run_time: The scheduler runtime for this job execution.
:param status: The new status for ths job execution.
:param exception: Details of any exceptions that need to be logged.
:param traceback: Traceback of any exceptions that occurred while executing the job.
:return: The ID of the newly created or updated DjangoJobExecution.
"""
# Ensure that only one update / create can be processed at a time, staying in sync with corresponding
# scheduler.
with lock:
# Convert all datetimes to internal Django format before doing calculations and persisting in the database.
run_time = get_django_internal_datetime(run_time)
finished = get_django_internal_datetime(timezone.now())
duration = (finished - run_time).total_seconds()
finished = finished.timestamp()
try:
with transaction.atomic():
job_execution = DjangoJobExecution.objects.select_for_update().get(
job_id=job_id, run_time=run_time
)
if status == DjangoJobExecution.SENT:
# Ignore 'submission' events for existing job executions. APScheduler does not appear to
# guarantee the order in which events are received, and it is not unusual to receive an
# `executed` before the corresponding `submitted` event. We just discard `submitted` events
# that are received after the job has already been executed.
#
# If there are any more instances like this then we probably want to implement a full blown
# state machine using something like `pytransitions`
# See https://github.com/pytransitions/transitions
return job_execution
job_execution.finished = finished
job_execution.duration = duration
job_execution.status = status
if exception:
job_execution.exception = exception
if traceback:
job_execution.traceback = traceback
job_execution.save()
except DjangoJobExecution.DoesNotExist:
# Execution was not created by a 'submit' previously - do so now
if status == DjangoJobExecution.SENT:
# Don't log durations until after job has been submitted for execution
finished = None
duration = None
job_execution = DjangoJobExecution.objects.create(
job_id=job_id,
run_time=run_time,
status=status,
duration=duration,
finished=finished,
exception=exception,
traceback=traceback,
)
return job_execution
def __str__(self):
return f"{self.id}: job '{self.job_id}' ({self.status})"
class Meta:
ordering = ("-run_time",)
总结
欢迎各位交流。您的点赞是我坚持的动力。
以上是关于Django框架中使用定时任务APScheduler的主要内容,如果未能解决你的问题,请参考以下文章
django定时任务python调度框架APScheduler使用详解
Django中使用Celery实现定时任务(用djcelery)