Springboot定时任务quartz整合(多数据源+quartz持久化到数据库)

Posted 明湖起风了

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Springboot定时任务quartz整合(多数据源+quartz持久化到数据库)相关的知识,希望对你有一定的参考价值。

核心实现思想:

多数据源实现:通过后台配置多个数据源,自定义注解,通过aop配置注解切面,前端调用需要传递数据源参数,根据判断数据源参数,调用相应的service或mapper方法。

(1条消息) Springboot整合多数据源(自定义注解+aop切面实现)_明湖起风了的博客-CSDN博客

quartz:通过serviceImpl实现类实现核心业务逻辑(添加、暂停、修改任务等),通过controller调用时传递相应任务参数,达到自定义控制任务。 

quartz 官方提供了初始化数据库所需要的脚本,下载链接:

        http://www.quartz-scheduler.org/downloads/files/quartz-2.2.3-distribution.tar.gz。

解压路径/quartz-2.2.3/docs/dbTables/tables_mysql.sql,不同数据库使用不同的脚本不一样,根据实现情况定义。创建新库,执行sql。

 pom

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!-- Mybatis核心 -->
        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis</artifactId>
            <version>3.5.7</version>
        </dependency>
        <!-- junit测试 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <!-- MySQL驱动 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.44</version>
        </dependency>
        <!--引入quartz定时框架-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.2.8</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>

    </dependencies>

yml:

server:
  port: 8088
spring:
  datasource:
    #数据源1
    db1:
      url: jdbc:mysql://localhost:3306/sg_security?characterEncoding=utf-8&serverTimezone=UTC
      username: root
      password: 123456
      driver-class-name: com.mysql.jdbc.Driver
      type: com.alibaba.druid.pool.DruidDataSource
    #数据源2
    db2:
      driver-class-name: com.mysql.jdbc.Driver
      url: jdbc:mysql://localhost:3306/mytest?characterEncoding=utf-8&serverTimezone=UTC
      username: root
      password: 123456
      type: com.alibaba.druid.pool.DruidDataSource
    #quartz数据源
    task:
      driver-class-name: com.mysql.jdbc.Driver
      url: jdbc:mysql://localhost:3306/task?characterEncoding=utf-8&serverTimezone=UTC&useSSL=false
      username: root
      password: 123456
      type: com.alibaba.druid.pool.DruidDataSource
  quartz:
    properties:
      org:
        quartz:
          scheduler:
            instanceName: DemoScheduler
            instanceId: AUTO
          jobStore:
            class: org.quartz.impl.jdbcjobstore.JobStoreTX
            driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate #跟数据库初始化脚本中配置保持一致
            tablePrefix: QRTZ_
            isClustered: true
            clusterCheckinInterval: 10000
            useProperties: false
          threadPool:
            class: org.quartz.simpl.SimpleThreadPool
            threadCount: 20
            threadPriority: 5
            threadsInheritContextClassLoaderOfInitializingThread: true
    job-store-type: jdbc
    overwrite-existing-jobs: true
    auto-startup: true

mybatis:
  mapper-locations: classpath:/mapper/*.xml
  #开启驼峰映射
  configuration:
    map-underscore-to-camel-case: true

任务bean

package com.example.mybatisinterceptor.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class QuartzBean 
    /** 任务id */
    private String  id;

    /** 任务名称 */
    private String jobName;

    /** 任务执行类 */
    private String jobClass;

    /** 任务状态 启动还是暂停*/
    private Integer status;

    /** 任务运行时间表达式 */
    private String cronExpression;

统一返回类型

package com.example.mybatisinterceptor.result;

/**
 * 自定义统一返回类型
 * 这里的统一返回指的是自定义异常的统一返回类型,不是controller接口的返回类型
 */
public class Response 

    private static final long serialVersionUID  = -8713837118340960775L;
    /**
     * 响应代码
     */
    private int code;

    /**
     * 响应消息
     */
    private String message;
    /**
     * 是否成功
     */
    private Boolean flg;

    /**
     * 响应结果
     */
    private Object result;

    public Response() 
    

    public Response(int code ,String message,Boolean flg) 
        this.code = code;
        this.message = message;
        this.flg = flg;

    



    public int getCode() 
        return code;
    

    public void setCode(int code) 
        this.code = code;
    

    public String getMessage() 
        return message;
    

    public void setMessage(String message) 
        this.message = message;
    

    public Object getResult() 
        return result;
    

    public void setResult(Object result) 
        this.result = result;
    

    /**
     * 成功
     *
     * @return
     */
    public static Response success() 
        return success();
    

    /**
     * 失败
     */
    public static Response error(int code, String message) 
        Response rb = new Response();
        rb.setCode(code);
        rb.setMessage(message);
        rb.setResult(null);
        return rb;
    

    /**
     * 失败
     */
    public static Response error( String message) 
        Response rb = new Response();
        rb.setCode(-1);
        rb.setMessage(message);
        rb.setResult(null);
        return rb;
    



quartz数据源配置

task库是存储持久化quartz的数据表,通过@QuartzDataSource告诉spring容器quartz使用的数据源是哪一个,会自动使用该库。

package com.example.mybatisinterceptor.config;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import com.example.mybatisinterceptor.ContextHolder.MyRoutingDataSource;
import com.example.mybatisinterceptor.enums.DataSourceType;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.boot.autoconfigure.quartz.QuartzDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;


import java.util.HashMap;
import java.util.Map;

@Configuration
@MapperScan(basePackages = "com.example.mybatisinterceptor.mapper")
public class DataSourceConfig 
    //数据库db1数据源
    @Bean(name = "dataSource1")
    @Primary
    @ConfigurationProperties("spring.datasource.db1")
    public DruidDataSource dataSource1 () 
        return DruidDataSourceBuilder.create().build();
    

    //数据库db2数据源
    @Bean(name = "dataSource2")
    @ConfigurationProperties("spring.datasource.db2")
    public DruidDataSource dataSource2 () 
        return DruidDataSourceBuilder.create().build();
    

    //quartz数据库 dataSourceTask数据源,使用@QuartzDataSource后,不需要动态配置
    @Bean(name = "dataSourceTask")
    @ConfigurationProperties("spring.datasource.task")
    @QuartzDataSource
    public DruidDataSource dataSourceTask ()  return DruidDataSourceBuilder.create().build(); 

    //将两个数据源添加至动态数据源配置类中
    @Bean(name = "myRoutingDataSource")
    public MyRoutingDataSource myRoutingDataSource (@Qualifier("dataSource1") DruidDataSource dataSource1,
                                                    @Qualifier("dataSource2") DruidDataSource dataSource2) 
        Map<Object, Object> map = new HashMap<>();
        map.put(DataSourceType.DB1, dataSource1);
        map.put(DataSourceType.DB2, dataSource2);
        MyRoutingDataSource myRoutingDataSource = new MyRoutingDataSource();
        myRoutingDataSource.setTargetDataSources(map);
        myRoutingDataSource.setDefaultTargetDataSource(dataSource1);
        return myRoutingDataSource;
    
    //数据源session
    @Bean(name = "sqlSessionFactory")
    public SqlSessionFactory sqlSessionFactory (@Qualifier("dataSource1") DruidDataSource dataSource1,
                                                @Qualifier("dataSource2") DruidDataSource dataSource2) throws Exception 
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(myRoutingDataSource(dataSource1,dataSource2));
        // 设置mapper.xml的位置路径
        Resource[] resources = new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*.xml");
        factoryBean.setMapperLocations(resources);
        return factoryBean.getObject();
    

    //数据源事物配置
    @Bean
    public PlatformTransactionManager transactionManager (@Qualifier("myRoutingDataSource")MyRoutingDataSource myRoutingDataSource)
        return new DataSourceTransactionManager(myRoutingDataSource);
    

创建任务,继承QuartzJobBean,覆写executeInternal方法,来实现任务类

package com.example.mybatisinterceptor.ScheduledJob;

import lombok.extern.slf4j.Slf4j;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;

@Slf4j
public class DemoJob extends QuartzJobBean 
    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException 
        //这里边写你的任务所要干的任何事情
        JobDetail jobDetail = jobExecutionContext.getJobDetail();
        log.info("------任务名:" + jobDetail.getKey().getName() + ",组名:" +
                jobDetail.getKey().getGroup() + "------我是要执行的定时任务工作内容!");
    

业务操作service类

package com.example.mybatisinterceptor.service;

import com.example.mybatisinterceptor.bean.QuartzBean;
import org.quartz.JobKey;

import java.util.List;

public interface IJobService 
    //查询所有任务
    public List<QuartzBean> getAllJobs();
    //恢复任务
    public boolean resumeJob(String jobName,String jobGroup);
    //停止任务
    public boolean pauseJob(String jobName,String jobGroup);
    //修改任务执行周期表达式
    public boolean reScheduleJob(String jobName,String jobGroup,String cronExpression);
    //删除任务
    public boolean deleteJob(String jobName,String jobGroup);
    //新增任务
    public int addJob(QuartzBean jobInfo);
    //判断任务是否存在
    public int isJobExist(JobKey jobKey);


实现类

package com.example.mybatisinterceptor.service.impl;

import com.example.mybatisinterceptor.bean.QuartzBean;
import com.example.mybatisinterceptor.service.IJobService;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;

@Service
@Slf4j
public class JobServiceImp implements IJobService 
    //Springboot已经为我们自动装配了任务调度器Scheduler,
    //无需额外配置便可以注入使用,由Springboot为我们管理调度器
    @Autowired
    private Scheduler scheduler;

    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Override
    public List<QuartzBean> getAllJobs() 
        List<QuartzBean> jobInfos = new ArrayList<>();
        try 
            List<String> groups = scheduler.getJobGroupNames();
            int i = 0;
            for(String group :groups) 
                GroupMatcher<JobKey> groupMatcher = GroupMatcher.groupEquals(group);
                Set<JobKey> jobKeys = scheduler.getJobKeys(groupMatcher);
                for(JobKey jobKey:jobKeys) 
                    QuartzBean jobInfo = new QuartzBean();
                    JobDetail jobDetail = scheduler.getJobDetail(jobKey);
                    jobInfo.setJobName(jobKey.getName());
                    jobInfo.setJobClass(jobDetail.getJobClass().getName());
                    Trigger jobTrigger = scheduler.getTrigger(TriggerKey.triggerKey(jobKey.getName(),jobKey.getGroup()));
                    if (jobTrigger != null) 
                        Trigger.TriggerState tState = scheduler.getTriggerState(TriggerKey.triggerKey(jobKey.getName(),jobKey.getGroup()));
                        jobInfo.setJobName(jobKey.getName());
                        try 
                            CronTrigger cronTrigger = (CronTrigger)jobTrigger;
                            jobInfo.setCronExpression(cronTrigger.getCronExpression());
                        catch (Exception e) 
                            log.info("不是CronTrigger");
                        
                        if (jobTrigger.getNextFireTime() != null)
                        jobInfo.setId(String.valueOf(i));
                        jobInfos.add(jobInfo);
                        i += 1;
                     else 
                        jobInfo.setStatus(0);
                        jobInfo.setId(String.valueOf(i));
                        jobInfos.add(jobInfo);
                        i += 1;
                    
                
            
         catch (SchedulerException e) 
            log.error(e.getMessage());
        
        return jobInfos;
    

    @Override
    public boolean resumeJob(String jobName, String jobGroup) 
        boolean result = true;
        try 
            scheduler.resumeJob(JobKey.jobKey(jobName,jobGroup));
         catch (SchedulerException e) 
            result = false;
            log.error(e.getMessage());
        
        return result;
    

    @Override
    public boolean pauseJob(String jobName, String jobGroup) 
        boolean result = true;
        try 
            scheduler.pauseJob(JobKey.jobKey(jobName,jobGroup));
         catch (SchedulerException e) 
            result = false;
            log.error(e.getMessage());
        
        return result;
    

    @Override
    public boolean reScheduleJob(String jobName, String jobGroup, String cronExpression) 
        //判断当前状态
        boolean result = true;
        try 
            Trigger.TriggerState triggerState = scheduler.getTriggerState(TriggerKey.triggerKey(jobName,jobGroup));
            CronTrigger cronTriggerOld = (CronTrigger)scheduler.getTrigger(TriggerKey.triggerKey(jobName,jobGroup));
            if (!cronTriggerOld.getCronExpression().equals(cronExpression))
                CronTrigger cronTriggerNew = TriggerBuilder.newTrigger().withIdentity(jobName,jobGroup)
                        .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
                        .build();
                scheduler.rescheduleJob(TriggerKey.triggerKey(jobName,jobGroup),cronTriggerNew);
                if (triggerState.name().equals("PAUSED"))
                    this.pauseJob(jobName,jobGroup);
            

         catch (SchedulerException e) 
            result = false;
            log.error(e.getMessage());
        
        return result;
    

    @Override
    public boolean deleteJob(String jobName, String jobGroup) 
        boolean result = true;
        try 
            List<? extends Trigger> triggers = scheduler.getTriggersOfJob(JobKey.jobKey(jobName,jobGroup));
            if (triggers.size() > 0) 
                if (!"PAUSED".equals(scheduler.getTriggerState(TriggerKey.triggerKey(jobName,jobGroup)).name()))
                    scheduler.pauseTrigger(TriggerKey.triggerKey(jobName,jobGroup));
                scheduler.unscheduleJob(TriggerKey.triggerKey(jobName,jobGroup));
            
            scheduler.deleteJob(JobKey.jobKey(jobName,jobGroup));
         catch (SchedulerException e) 
            result = false;
            log.error(e.getMessage());
        
        return result;
    
    @Override
    public int addJob(QuartzBean jobInfo) 
        int result = 0;
        int isJobExist = this.isJobExist(JobKey.jobKey(jobInfo.getJobName()));
        if (isJobExist == 1) 
            result = -1;
            log.info("任务已经存在!");
         else 
            try 
                JobDetail jobDetail = null;
                if (isJobExist == 0) 
                    jobDetail = scheduler.getJobDetail(JobKey.jobKey(jobInfo.getJobName()));
                else if (isJobExist == -1) 
                    jobDetail = JobBuilder.newJob(
                                    (Class<? extends QuartzJobBean>)Class.forName(jobInfo.getJobClass()))
                            .withIdentity(jobInfo.getJobName())
                            .storeDurably().build();
                
                //如果jobInfo的cron表达式为空,则创建常规任务,反之创建周期任务
                if (!StringUtils.isEmpty(jobInfo.getCronExpression())) 
                    CronTrigger cronTrigger = TriggerBuilder.newTrigger()
                            .withIdentity(jobInfo.getJobName())
                            .withSchedule(CronScheduleBuilder.cronSchedule(jobInfo.getCronExpression()))
                            .build();
                    scheduler.scheduleJob(jobDetail,cronTrigger);
                 else 
                    Trigger trigger = TriggerBuilder.newTrigger()
                            .withIdentity(jobInfo.getJobName())
                            .startAt(sdf.parse(String.valueOf(new Date())))
                            .withSchedule(SimpleScheduleBuilder.simpleSchedule().
                                    withRepeatCount(0))
                            .build();
                    scheduler.scheduleJob(jobDetail,trigger);
                

            catch (ClassNotFoundException e) 
                result = 1;
                log.error("任务对应的Class类不存在");
             catch (SchedulerException e) 
                result = 2;
                log.error("任务调度失败");
             catch (ParseException e) 
                result = 3;
                log.error("时间转换出错");
            
        
        return result;
    

    @Override
    public int isJobExist(JobKey jobKey) 
        int result = 1;
        try 
            JobDetail jobDetail = scheduler.getJobDetail(jobKey);
            List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
            if (jobDetail != null && triggers.size() > 0)
                result = 1;
            else if (jobDetail != null && triggers.size() == 0)
                result = 0;
            else
                result = -1;
         catch (SchedulerException e) 
            result = -1;
            log.info("任务不存在!");
        
        return result;
    

controller控制

package com.example.mybatisinterceptor.controller;

import com.example.mybatisinterceptor.bean.QuartzBean;
import com.example.mybatisinterceptor.result.Response;
import com.example.mybatisinterceptor.service.IJobService;
import lombok.extern.slf4j.Slf4j;
import org.quartz.CronExpression;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@Controller
@RequestMapping(value = "/task")
@Slf4j
public class JobController 

    @Autowired
    private IJobService jobService;

    @GetMapping(value = "/ok")
    @ResponseBody
    public String sendok()
        return "ok";
    

    @PostMapping(value = "/add")
    @ResponseBody
    public Response addJob(@RequestBody QuartzBean jobInfo) 
        Response response = null;
        switch (jobService.addJob(jobInfo)) 
            case -1 : response = new Response(500,"任务已存在!",false);break;
            case 0 : response = new Response(200,"success",true);break;
            case 1 : response = new Response(500,"没有该任务对应的 Java Class 类!",false);break;
            case 2 : response = new Response(500,"添加任务失败!",false);break;
            case 3 : response = new Response(500,"时间格式错误!",false);break;
        
        return response;
    

    @GetMapping(value = "/jobs")
    @ResponseBody
    public Response getAllJobs() 
        List<QuartzBean> jobInfos = jobService.getAllJobs();
        return jobInfos.size() > 0 ? new Response(200,"success",true)
                : new Response(500,"No job",false);
    

    @PostMapping(value = "/pause")
    @ResponseBody
    public Response pauseJob( String  name,  String group) 
        return jobService.pauseJob(name,group) ? new Response(200,"success",true)
                : new Response(500,"errror",false);
    

    @PostMapping(value = "/resume")
    @ResponseBody
    public Response resumeJob(String name, String group) 
        return jobService.resumeJob(name,group) ? new Response(200,"success",true)
                : new Response(500,"errror",false);
    

    @PostMapping(value = "/reschedule")
    @ResponseBody
    public Response reScheduleJob(String name, String group, String cron) 
        return jobService.reScheduleJob(name, group, cron) ? new Response(200,"success",true)
                : new Response(500,"errror",false);
    

    @PostMapping(value = "/delete")
    @ResponseBody
    public Response deleteJob(String name, String group) 
        return jobService.deleteJob(name,group) ? new Response(200,"success",true)
                : new Response(500,"errror",false);
    

    //校验是否是合法cron表达式
    @PostMapping(value = "/cron-check")
    @ResponseBody
    public Response checkCron(String cron) 
        boolean valide = false;
        try 
            valide = CronExpression.isValidExpression(cron);
        catch (Exception e)
            log.error(e.getMessage());
        
        return valide ? new Response(200,"success",true)
                : new Response(500,"cron表达式格式错误!",false);
    

测试:创建任务

 

 暂停:

至此完毕!

美中不足的是,服务关闭重启后,任务不自动启动,这块读者可以自行完善。

参考链接:

Springboot+quartz整合(多数据源+quartz持久化到数据库)_橙子呼叫石榴的博客-CSDN博客_quartz 多数据源

以上是关于Springboot定时任务quartz整合(多数据源+quartz持久化到数据库)的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot整合Quartz定时任务 的简单实例

SpringBoot整合Quartz实现动态的创建或删除定时任务并将定时调度任务持久化到MySQL以及Quartz集群配置

SpringBoot之旅 -- 定时任务两种(Spring Schedule 与 Quartz 整合 )实现

springboot-01整合quartz

SpringBoot整合quartz实现动态启动,停止定时任务功能

SpringBoot整合Quartz实现动态的创建或删除定时任务并将定时调度任务持久化到MySQL以及Quartz集群配置