quartz集群调度任务负载不均

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了quartz集群调度任务负载不均相关的知识,希望对你有一定的参考价值。

参考技术A 多台服务集群模式执行调度周期短的任务不能均衡负载。

下面的run()方法中,不考虑halted和siglock的情况下1)只有 if (triggers != null && !triggers.isEmpty()) 条件满足的情况下,while循环才会进入暂停状态 sigLock.wait(timeUntilContinue); 2)并且获取待执行trigger时最终执行的sql以及传入的条件如下,即next fire time范围为:

与查询条件相关的参数:

所以只有run方法中的while暂停,不去获取新的trigger时其他服务才有机会拿到执行任务,以达到负载均衡。

https://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/tutorial-lesson-11.html

SpringBoot整合Quartz实现动态的创建或删除定时任务并将定时调度任务持久化到MySQL以及Quartz集群配置

1.创建quartz数据库并导入quartz的SQL脚本文件

quartz源码下载地址:http://www.quartz-scheduler.org/downloads/
下载完成后解压,在/src/org/quartz/impl/jdbcjobstore可以找到对应数据库的SQL脚本
我这里使用的是MySQL数据库,SQL脚本如下:

CREATE DATABASE `quartz` /*!40100 DEFAULT CHARACTER SET utf8 */;

DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;

CREATE TABLE QRTZ_JOB_DETAILS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    JOB_NAME  VARCHAR(200) NOT NULL,
    JOB_GROUP VARCHAR(200) NOT NULL,
    DESCRIPTION VARCHAR(250) NULL,
    JOB_CLASS_NAME   VARCHAR(250) NOT NULL,
    IS_DURABLE VARCHAR(1) NOT NULL,
    IS_NONCONCURRENT VARCHAR(1) NOT NULL,
    IS_UPDATE_DATA VARCHAR(1) NOT NULL,
    REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
    JOB_DATA BLOB NULL,
    PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
);

CREATE TABLE QRTZ_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    JOB_NAME  VARCHAR(200) NOT NULL,
    JOB_GROUP VARCHAR(200) NOT NULL,
    DESCRIPTION VARCHAR(250) NULL,
    NEXT_FIRE_TIME BIGINT(13) NULL,
    PREV_FIRE_TIME BIGINT(13) NULL,
    PRIORITY INTEGER NULL,
    TRIGGER_STATE VARCHAR(16) NOT NULL,
    TRIGGER_TYPE VARCHAR(8) NOT NULL,
    START_TIME BIGINT(13) NOT NULL,
    END_TIME BIGINT(13) NULL,
    CALENDAR_NAME VARCHAR(200) NULL,
    MISFIRE_INSTR SMALLINT(2) NULL,
    JOB_DATA BLOB NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
        REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
);

CREATE TABLE QRTZ_SIMPLE_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    REPEAT_COUNT BIGINT(7) NOT NULL,
    REPEAT_INTERVAL BIGINT(12) NOT NULL,
    TIMES_TRIGGERED BIGINT(10) NOT NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
        REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);

CREATE TABLE QRTZ_CRON_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    CRON_EXPRESSION VARCHAR(200) NOT NULL,
    TIME_ZONE_ID VARCHAR(80),
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
        REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);

CREATE TABLE QRTZ_SIMPROP_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    STR_PROP_1 VARCHAR(512) NULL,
    STR_PROP_2 VARCHAR(512) NULL,
    STR_PROP_3 VARCHAR(512) NULL,
    INT_PROP_1 INT NULL,
    INT_PROP_2 INT NULL,
    LONG_PROP_1 BIGINT NULL,
    LONG_PROP_2 BIGINT NULL,
    DEC_PROP_1 NUMERIC(13,4) NULL,
    DEC_PROP_2 NUMERIC(13,4) NULL,
    BOOL_PROP_1 VARCHAR(1) NULL,
    BOOL_PROP_2 VARCHAR(1) NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
    REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);

CREATE TABLE QRTZ_BLOB_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    BLOB_DATA BLOB NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
        REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);

CREATE TABLE QRTZ_CALENDARS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    CALENDAR_NAME  VARCHAR(200) NOT NULL,
    CALENDAR BLOB NOT NULL,
    PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
);

CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_GROUP  VARCHAR(200) NOT NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
);

CREATE TABLE QRTZ_FIRED_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    ENTRY_ID VARCHAR(95) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    INSTANCE_NAME VARCHAR(200) NOT NULL,
    FIRED_TIME BIGINT(13) NOT NULL,
    SCHED_TIME BIGINT(13) NOT NULL,
    PRIORITY INTEGER NOT NULL,
    STATE VARCHAR(16) NOT NULL,
    JOB_NAME VARCHAR(200) NULL,
    JOB_GROUP VARCHAR(200) NULL,
    IS_NONCONCURRENT VARCHAR(1) NULL,
    REQUESTS_RECOVERY VARCHAR(1) NULL,
    PRIMARY KEY (SCHED_NAME,ENTRY_ID)
);

CREATE TABLE QRTZ_SCHEDULER_STATE
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    INSTANCE_NAME VARCHAR(200) NOT NULL,
    LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
    CHECKIN_INTERVAL BIGINT(13) NOT NULL,
    PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
);

CREATE TABLE QRTZ_LOCKS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    LOCK_NAME  VARCHAR(40) NOT NULL,
    PRIMARY KEY (SCHED_NAME,LOCK_NAME)
);

commit;

2.构建SpringBoot工程整合Quartz

1.引入Quartz的POM依赖
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
2.YML配置

配置参考:https://github.com/wangmaoxiong/quartzjdbc/blob/master/src/main/resources/application-cluster.yml

spring:
  # Mysql数据库
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/quartz?autoReconnect=true&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=CONVERT_TO_NULL&useSSL=false&serverTimezone=CTT
    username: root
    password: root
    # 连接池大小
    max-active: 20
    max-pool-prepared-statement-per-connection-size: 20
  # quartz相关配置
  quartz:
    # 将任务等保存化到数据库
    job-store-type: jdbc
    # 程序结束时会等待quartz相关的内容结束
    wait-for-jobs-to-complete-on-shutdown: true
    # QuartzScheduler启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录
    overwrite-existing-jobs: true
    properties:
      org:
        quartz:
          # scheduler相关
          scheduler:
            # scheduler的实例名
            instanceName: scheduler
            instanceId: AUTO
          # 持久化相关
          jobStore:
            class: org.quartz.impl.jdbcjobstore.JobStoreTX
            driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
            # 表示数据库中相关表是QRTZ_开头的
            tablePrefix: QRTZ_
            useProperties: false
            # 配置集群
            # 是否加入集群
            isClustered: true
            # 容许的最大作业延长时间
            clusterCheckinInterval: 20000
          # 线程池相关
          threadPool:
            class: org.quartz.simpl.SimpleThreadPool
            # 线程数
            threadCount: 10
            # 线程优先级
            threadPriority: 5
            threadsInheritContextClassLoaderOfInitializingThread: true
3.编写Job类
/**
 * @Author ScholarTang
 * @Date 2021/7/14 下午4:09
 * @Desc 定时任务处理(允许并发执行)
 */
@Slf4j
@Component
public class QuartzJobExecution implements Job 

  @Autowired
  private SendNoticeUtils sendNoticeUtils;

  @SneakyThrows
  @Override
  public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException 
    //执行逻辑体,发送消息
    sendNoticeUtils.sendNotice(jobExecutionContext);
  

4.编写一个工具类,用来创建、删除定时调度任务的工具类
/**
 * @Author ScholarTang
 * @Date 2021/7/15 下午3:50
 * @Desc 定时任务工具类
 */

@Component
public class ScheduleUtils 

  @Autowired
  private Scheduler scheduler;

  /**
   * 构建任务触发对象
   * @param jobName
   * @param jobGroup
   * @return
   */
  public  TriggerKey getTriggerKey(String jobName, String jobGroup) 
    return TriggerKey.triggerKey(jobName, jobGroup);
  

  /**
   * 构建任务键对象
   * @param jobName
   * @param jobGroup
   * @return
   */
  public  JobKey getJobKey(String jobName, String jobGroup) 
    return JobKey.jobKey(jobName, jobGroup);
  

  /**
   * 创建日定时调度任务
   * @param schedulingTaskVo
   * @throws SchedulerException
   * @throws TaskException
   */
  public  void createScheduleJob(SchedulingTaskVo schedulingTaskVo) throws SchedulerException, TaskException 
    // 构建job信息
    String jobName = schedulingTaskVo.getJobId() + "_" + schedulingTaskVo.getJobName();
    String jobGroupName = schedulingTaskVo.getJobGroupName();

    //构建job实例
    JobDetail jobDetail = JobBuilder.newJob(QuartzJobExecution.class).withIdentity(getJobKey(jobName, jobGroupName)).build();

    // 表达式调度构建器
    CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(schedulingTaskVo.getCronExpression());
    cronScheduleBuilder = handleCronScheduleMisfirePolicy(schedulingTaskVo, cronScheduleBuilder);

    // 按新的cronExpression表达式构建一个新的trigger
    CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(getTriggerKey(jobName, jobGroupName))
      .withSchedule(cronScheduleBuilder).build();

    // 放入参数,运行时的方法可以获取
    jobDetail.getJobDataMap().put("TASK_PROPERTIES", schedulingTaskVo);

    // 判断是否存在
    if (scheduler.checkExists(getJobKey(jobName, jobGroupName))) 
      // 防止创建时存在数据问题 先移除,然后在执行创建操作
      scheduler.deleteJob(getJobKey(jobName, jobGroupName));
    

    //创建定时任务调度
    scheduler.scheduleJob(jobDetail, trigger);

    // 暂停任务
    if (schedulingTaskVo.getStatus().equals(ScheduleConstants.Status.PAUSE.getValue())) 
      scheduler.pauseJob(getJobKey(jobName, jobGroupName));
    
  


  /**
   * 删除定时调度任务
   * @param schedulingTaskVo
   * @throws SchedulerException
   */
  public void deleteScheduleJob(SchedulingTaskVo schedulingTaskVo) throws SchedulerException 
    scheduler.deleteJob(getJobKey(schedulingTaskVo.getJobId() + "_" + schedulingTaskVo.getJobName(), schedulingTaskVo.getJobGroupName()));
  


  /**
   * 设置定时任务策略
   * @param SchedulingTaskVo
   * @param cb
   * @return
   */
  public  CronScheduleBuilder handleCronScheduleMisfirePolicy(SchedulingTaskVo SchedulingTaskVo, CronScheduleBuilder cb) throws TaskException 
    switch (SchedulingTaskVo.getMisfirePolicy()) 
      case "0":
        return cb;
      case "1":
        return cb.withMisfireHandlingInstructionIgnoreMisfires();
      case "2":
        return cb.withMisfireHandlingInstructionFireAndProceed();
      case "3":
        return cb.withMisfireHandlingInstructionDoNothing();
      default:
        throw new TaskException("任务失败策略 '" + SchedulingTaskVo.getMisfirePolicy()
          + "' 不能在cron计划任务中使用", TaskException.Code.CONFIG_ERROR);
    
  

5.异常类
/**
 * @Author ScholarTang
 * @Date 2021/7/15 上午11:40
 * @Desc 计划策略异常类
 */

public class TaskException extends Exception 
  private static final long serialVersionUID = 1L;

  private Code code;

  public TaskException(String msg, Code code) 
    this(msg, code, null);
  

  public TaskException(String msg, Code code, Exception nestedEx) 
    super(msg, nestedEx);
    this.code = code;
  

  public Code getCode() 
    return code;
  

  public enum Code 
    TASK_EXISTS, NO_TASK_EXISTS, TASK_ALREADY_STARTED, UNKNOWN, CONFIG_ERROR, TASK_NODE_NOT_AVAILABLE, SEND_NOTICE_ERROR
  

6.最后

到这里定时调度服务的代码就写完了,如果是想动态的去创建/删除定时调度任务的话只需要将这个utils工具类注入到需要使用它的类中,去调用工具类中的方法创建或删除定时任务即可。

在创建任务时会将任务持久化到数据库,删除则是将数据库中存储的定时任务进行删除。

以上是关于quartz集群调度任务负载不均的主要内容,如果未能解决你的问题,请参考以下文章

Quartz框架简介

SpringBoot整合Quartz实现动态的创建或删除定时任务并将定时调度任务持久化到MySQL以及Quartz集群配置

SpringBoot整合Quartz实现动态的创建或删除定时任务并将定时调度任务持久化到MySQL以及Quartz集群配置

SpringBoot整合Quartz实现动态的创建或删除定时任务并将定时调度任务持久化到MySQL以及Quartz集群配置

SpringBoot整合Quartz实现动态的创建或删除定时任务并将定时调度任务持久化到MySQL以及Quartz集群配置

quartz 任务调度问题,每次都执行两次 相隔只有几微秒