Quartz使用 - Quartz项目实战

Posted mengrennwpu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Quartz使用 - Quartz项目实战相关的知识,希望对你有一定的参考价值。

本片博文将阐述项目工作中使用Quartz的情况,包含项目背景、项目框架、Quartz集群部署等方面,重点讲述如何在实际项目中使用Quartz。

1. 背景

因项目需求,需要定时调用数据下载接口,并将数据存储至诸如mongo、redis、elasticsearch等数据库或缓存中。具体涉及到的需求如下:

a. 调用接口的任务均从mongo数据库读取;

b. 任务的个数随着业务量的增加而增加;

c. 每个调用任务的定时执行时间可能不同,且定时执行时间在mongo中可配置;

d. 任务的执行需要动态更新,如检测到某一任务的定时时间发生变化,则任务的执行也需要实时修改 

e. mongo、redis、elasticsearch等数据库中所存储的字段也由mongo进行配置;

f. 任务执行需要实时性较高、可靠性较强、可扩展性较高等

综上需求,调研了一番,发现任务调度框架Quartz可满足项目需求。

2. 框架

基于项目的需求,结合任务调度框架Quartz,大体的流程框架如下图所示:

1) 首先构建从mongo加载任务

2) 将任务的配置信息初始化至Quartz

3) 通过Quartz的Job任务实现定时调用下载接口任务

4) 将下载的数据依据配置,存储至数据库中

5) 定时检测任务通过定时扫描mongo数据库,查看相关任务信息的配置是否发生变化,如果发生变化,则进行动态更新

6) 为了实现高可用性、可扩展性,可以直接使用Quartz原生的集群特性。

3. 核心代码

核心代码将会涵盖上述流程图中的相关环节,为了项目的保密性,相关信息也会隐藏。

3.1 任务主流程  

import com.quartz.conf.Configuration;
import com.quartz.conf.OcpConfHelper;
import com.quartz.module.TaskInfo;
import org.apache.log4j.PropertyConfigurator;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class SchedulerRunner {
    static Logger logger = LoggerFactory.getLogger(SchedulerRunner.class);

    public static void main(String[] args) {

        // 加载日志配置文件
        PropertyConfigurator.configure("./conf/log4j.properties");

        // 加载quartz配置文件
        System.setProperty("org.quartz.properties", "./conf/quartz.properties");

        // 执行任务解析与调度
        run();
    }

    public static void run(){
        // 获取配置信息表
        List<TaskInfo> taskInfos = GenerateTaskInfo.generateTaskInfoFrommysql();
        if(taskInfos.size() == 0){
            logger.info("there is no tasks from mongoInfo");
            return;
        }

        // 过滤下线任务
        taskInfos = GenerateTaskInfo.filterTask(taskInfos);
        if(taskInfos.size() == 0){
            logger.info("all tasks if offline, no need to run");
            return;
        }

        Scheduler scheduler = null;
        try {
            scheduler = StdSchedulerFactory.getDefaultScheduler();
        } catch (SchedulerException e) {
            e.printStackTrace();
        }

        if(scheduler == null){
            logger.error("create scheduler failed");
            return;
        }

        if(isSchedulerClear()){
            clearSchedulerJob(scheduler);
        }

        // 加入任务调度
        for(TaskInfo task : taskInfos){
            SchedulerFactory.addJob2Scheduler(task, scheduler);
        }

        // 加入动态更新任务
        SchedulerFactory.addDynamicUpdateJob2Scheduler(scheduler);

        // 开启任务
        try {
            scheduler.start();
        } catch (SchedulerException e) {
            logger.error("start scheduler error!");
        }
    }

    public static void clearSchedulerJob(Scheduler scheduler){
        try {
            scheduler.clear();
        } catch (SchedulerException e) {
            logger.error("clear  scheduler error!");
        }
    }

    /**
     * 基于配置文件中的信息,加载调度器开始运行时的清洗标识
     * @return
     */
    private static boolean isSchedulerClear(){
        Configuration conf = OcpConfHelper.getInstance().getOcpConf();
        return conf.getBooleanValue("cleanSchedulerFlag", "true");
    }
}
View Code

3.2 封装任务对象

import java.util.List;
import java.util.Map;


public class TaskInfo {

    protected String categoryId; // 业务Id
    protected String categoryName; // 业务名称
    protected String sourceId; // 信源Id
    protected String sourceName; // 信源名称
    protected int sourceStatus; // 信源状态
    protected String pipelineConf; // 信源pipeline配置信息
    protected List<String> dbStoreTypes; // 业务的存储类型
    protected String esConfInfo; // ES存储配置
    protected String dbConfInfo; // DB存储配置
    protected String cronInfo; // 定时任务信息
    protected int sourceType; // 实时更新还是离线更新
    protected List<String> indexBuildEles; // 更新索引的信息
    protected List<String> idBuildEles; // id的构建因素
    protected String indexType; // 全量或增量
    protected String categoryLevel1; // 一级分类
    protected String zhName; // 中文信息
    protected Map<String,String> outputType; //输出参数名及其类型
    protected String providerName;
    protected String functionName; //category_function名称

    public String getProviderName() {
        return providerName;
    }

    public void setProviderName(String providerName) {
        this.providerName = providerName;
    }

    public String getCategoryId() {
        return categoryId;
    }

    public void setCategoryId(String categoryId) {
        this.categoryId = categoryId;
    }

    public String getCategoryName() {
        return categoryName;
    }

    public void setCategoryName(String categoryName) {
        this.categoryName = categoryName;
    }

    public String getSourceId() {
        return sourceId;
    }

    public void setSourceId(String sourceId) {
        this.sourceId = sourceId;
    }

    public String getSourceName() {
        return sourceName;
    }

    public void setSourceName(String sourceName) {
        this.sourceName = sourceName;
    }

    public int getSourceStatus() {
        return sourceStatus;
    }

    public void setSourceStatus(int sourceStatus) {
        this.sourceStatus = sourceStatus;
    }

    public String getPipelineConf() {
        return pipelineConf;
    }

    public void setPipelineConf(String pipelineConf) {
        this.pipelineConf = pipelineConf;
    }

    public String getEsConfInfo() {
        return esConfInfo;
    }

    public void setEsConfInfo(String esConfInfo) {
        this.esConfInfo = esConfInfo;
    }

    public String getDbConfInfo() {
        return dbConfInfo;
    }

    public void setDbConfInfo(String dbConfInfo) {
        this.dbConfInfo = dbConfInfo;
    }

    public String getCronInfo() {
        return cronInfo;
    }

    public void setCronInfo(String cronInfo) {
        this.cronInfo = cronInfo;
    }

    public int getSourceType() {
        return sourceType;
    }

    public void setSourceType(int sourceType) {
        this.sourceType = sourceType;
    }

    public List<String> getIdBuildEles() {
        return idBuildEles;
    }

    public void setIdBuildEles(List<String> idBuildEles) {
        this.idBuildEles = idBuildEles;
    }

    public List<String> getIndexBuildEles() {
        return indexBuildEles;
    }

    public void setIndexBuildEles(List<String> indexBuildEles) {
        this.indexBuildEles = indexBuildEles;
    }

    public String getIndexType() {
        return indexType;
    }

    public void setIndexType(String indexType) {
        this.indexType = indexType;
    }

    public String getCategoryLevel1() {
        return categoryLevel1;
    }

    public void setCategoryLevel1(String categoryLevel1) {
        this.categoryLevel1 = categoryLevel1;
    }

    public String getZhName() {
        return zhName;
    }

    public void setZhName(String zhName) {
        this.zhName = zhName;
    }

    public TaskInfo(){}

    public List<String> getDbStoreTypes() {
        return dbStoreTypes;
    }

    public void setDbStoreTypes(List<String> dbStoreTypes) {
        this.dbStoreTypes = dbStoreTypes;
    }

    public Map<String, String> getOutputType() {
        return outputType;
    }

    public void setOutputType(Map<String, String> outputType) {
        this.outputType = outputType;
    }

    public String getFunctionName() {
        return functionName;
    }

    public void setFunctionName(String functionName) {
        this.functionName = functionName;
    }

    /**
     * 是否有相同的定时信息
     * @param taskInfo
     * @return
     */
    public boolean hasSameCronInfo(TaskInfo taskInfo){
        if(taskInfo == null) return false;
        return this.getCronInfo().equalsIgnoreCase(taskInfo.getCronInfo());
    }

    @Override
    public String toString() {
        return "TaskInfo{" +
                "categoryId=\'" + categoryId + \'\\\'\' +
                ", categoryName=\'" + categoryName + \'\\\'\' +
                ", sourceId=\'" + sourceId + \'\\\'\' +
                ", sourceName=\'" + sourceName + \'\\\'\' +
                ", sourceStatus=" + sourceStatus +
                ", pipelineConf=\'" + pipelineConf + \'\\\'\' +
                ", dbStoreTypes=" + dbStoreTypes +
                ", esConfInfo=\'" + esConfInfo + \'\\\'\' +
                ", dbConfInfo=\'" + dbConfInfo + \'\\\'\' +
                ", cronInfo=\'" + cronInfo + \'\\\'\' +
                ", sourceType=" + sourceType +
                ", indexBuildEles=" + indexBuildEles +
                ", idBuildEles=" + idBuildEles +
                ", indexType=\'" + indexType + \'\\\'\' +
                ", categoryLevel1=\'" + categoryLevel1 + \'\\\'\' +
                ", zhName=\'" + zhName + \'\\\'\' +
                ", outputType=\'" + outputType + \'\\\'\' +
                ", providerName=\'" + providerName + \'\\\'\' +
                ", functionName=\'" + functionName + \'\\\'\' +
                \'}\';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        TaskInfo taskInfo = (TaskInfo) o;

        if (sourceStatus != taskInfo.sourceStatus) return false;
        if (sourceType != taskInfo.sourceType) return false;
        if (categoryName != null ? !categoryName.equals(taskInfo.categoryName) : taskInfo.categoryName != null)
            return false;
        if (sourceName != null ? !sourceName.equals(taskInfo.sourceName) : taskInfo.sourceName != null) return false;
        if (providerName != null ? !providerName.equals(taskInfo.providerName) : taskInfo.providerName != null) return false;
        if (pipelineConf != null ? !pipelineConf.equals(taskInfo.pipelineConf) : taskInfo.pipelineConf != null)
            return false;
        if (dbStoreTypes != null ? !dbStoreTypes.equals(taskInfo.dbStoreTypes) : taskInfo.dbStoreTypes != null)
            return false;
        if (esConfInfo != null ? !esConfInfo.equals(taskInfo.esConfInfo) : taskInfo.esConfInfo != null) return false;
        if (dbConfInfo != null ? !dbConfInfo.equals(taskInfo.dbConfInfo) : taskInfo.dbConfInfo != null) return false;
        if (cronInfo != null ? !cronInfo.equals(taskInfo.cronInfo) : taskInfo.cronInfo != null) return false;
        if (indexBuildEles != null ? !indexBuildEles.equals(taskInfo.indexBuildEles) : taskInfo.indexBuildEles != null)
            return false;
        if (idBuildEles != null ? !idBuildEles.equals(taskInfo.idBuildEles) : taskInfo.idBuildEles != null)
            return false;
        if (indexType != null ? !indexType.equals(taskInfo.indexType) : taskInfo.indexType != null) return false;
        if (categoryLevel1 != null ? !categoryLevel1.equals(taskInfo.categoryLevel1) : taskInfo.categoryLevel1 != null)
            return false;
        if (outputType != null ? !outputType.equals(taskInfo.outputType) : taskInfo.outputType != null)
            return false;
        if (functionName != null ? !functionName.equals(taskInfo.functionName) : taskInfo.functionName != null)
            return false;
        return zhName != null ? zhName.equals(taskInfo.zhName) : taskInfo.zhName == null;

    }

    @Override
    public int hashCode() {
        int result = categoryName != null ? categoryName.hashCode() : 0;
        result = 31 * result + (sourceName != null ? sourceName.hashCode() : 0);
        result = 31 * result + (providerName != null ? providerName.hashCode() : 0);
        result = 31 * result + sourceStatus;
        result = 31 * result + (pipelineConf != null ? pipelineConf.hashCode() : 0);
        result = 31 * result + (dbStoreTypes != null ? dbStoreTypes.hashCode() : 0);
        result = 31 * result + (esConfInfo != null ? esConfInfo.hashCode() : 0);
        result = 31 * result + (dbConfInfo != null ? dbConfInfo.hashCode() : 0);
        result = 31 * result + (cronInfo != null ? cronInfo.hashCode() : 0);
        result = 31 * result + sourceType;
        result = 31 * result + (indexBuildEles != null ? indexBuildEles.hashCode() : 0);
        result = 31 * result + (idBuildEles != null ? idBuildEles.hashCode() : 0);
        result = 31 * result + (indexType != null ? indexType.hashCode() : 0);
        result = 31 * result + (categoryLevel1 != null ? categoryLevel1.hashCode() : 0);
        result = 31 * result + (zhName != null ? zhName.hashCode() : 0);
        result = 31 * result + (outputType != null ? outputType.hashCode() : 0);
        result = 31 * result + (functionName != null ? functionName.hashCode() : 0);
        return result;
    }
}
View Code

3.3 任务的构造及初始化

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.quartz.consts.SourceType;
import com.quartz.consts.Sql;
import com.quartz.consts.StatusType;
import com.quartz.module.TaskInfo;
import com.quartz.util.MongoUtil;
import com.quartz.util.MySqlUtil;
import com.quartz.util.TimeUtil;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.util.*;

/**
 * 获取调度任务的相关信息
 * Created by songwang4 on 2017/6/7.
 */
public class GenerateTaskInfo {

    static Logger logger = LoggerFactory.getLogger(GenerateTaskInfo.class);

    static DBCollection sourceColl = MongoUtil.createOcpSourceDB();
    static DBCollection categoryColl = MongoUtil.createOcpCategoryDB();

    /**
     * 从数据库中读取任务相关信息
     *
     * @return
     */
    public static List<TaskInfo> generateTaskInfoFromMongo() {
        // 将任务信息进行封装
        List<TaskInfo> tasks = Lists.newArrayList();
        TaskInfo task = null;

        DBCursor sourceCur = sourceColl.find();
        DBObject sourceObj = null;
        DBObject categoryObj = null;
        while (sourceCur.hasNext()) {
            sourceObj = sourceCur.next();
            task = new TaskInfo();

            String sourceName = sourceObj.get("sourceName").toString();
            String categoryName = sourceObj.get("category").toString();
            // 基于业务名查找对应的业务表信息
            categoryObj = categoryColl.findOne(new BasicDBObject("catName", categoryName));
            if (categoryObj == null) {
                logger.error("no category found through source: " + sourceName);
                continue;
            }

            task.setCategoryId(categoryObj.get("_id").toString()); // 业务Id
            task.setCategoryName(categoryName); // 业务名

            List<String> dbStoreTypes = Lists.newArrayList();
            if (categoryObj.containsField("storeType")) {
                try {
                    JSONArray storeTypeArr = JSON.parseArray(categoryObj.get("storeType").toString());
                    for (int i = 0; i < storeTypeArr.size(); i++) {
                        dbStoreTypes.add(storeTypeArr.getString(i));
                    }
                } catch (Exception e) {
                }
            }
            task.setDbStoreTypes(dbStoreTypes); // 存储类型

            task.setCategoryLevel1(categoryObj.get("parent").toString()); // 一级业务分类
            task.setZhName(sourceObj.get("zhName").toString());

            task.setDbConfInfo(categoryObj.containsField("db") ? categoryObj.get("db").toString() : categoryName); // DB配置
            task.setEsConfInfo(categoryObj.containsField("es") ? categoryObj.get("es").toString() : categoryName); // ES配置

            task.setIndexBuildEles(extractBuilderEles(categoryObj, "isIndex", "itemName")); // 构建ES索引信息
            task.setIdBuildEles(extractBuilderEles(categoryObj, "isGK", "itemName")); // 构建id的信息元素

            task.setSourceId(sourceObj.get("_id").toString()); // 信源Id
            task.setSourceName(sourceName); // 信源名称

            int status = StatusType.OFFLINE;
            if (sourceObj.containsField("status")) {
                String statusType = sourceObj.get("status").toString();
                if (statusType.equals(StatusType.STR_ONLINE)) {
                    status = StatusType.ONLINE;
                }
            }
            task.setSourceStatus(status); // 信源的上下线状态

            int sourceType = SourceType.REAL_TIME_PROCESS;
            if (sourceObj.containsField("type")) {
                String strStatusType = sourceObj.get("type").toString();
                if (strStatusType.equals(SourceType.STR_OFF_LINE_PROCESS)) {
                    sourceType = SourceType.OFF_LINE_PROCESS;
                }
            }
            task.setSourceType(sourceType); // 离线或实时处理

            task.setIndexType(sourceObj.containsField("indexType") ?
                    sourceObj.get("indexType").toString() : ""); // 增量或全量标识

            // 定时时间配置
            task.setCronInfo(sourceObj.containsField("timerInfo") ?
                    sourceObj.get("timerInfo").toString() : "");
            if (task.getCronInfo().trim().length() == 0) {
                task.setCronInfo(generateCronInfo(sourceObj));
            }

            task.setPipelineConf(sourceObj.containsField("mappingWorkflow") ?
                    sourceObj.get("mappingWorkflow").toString() : ""); // pipeline配置信息

            tasks.add(task);
        }
        sourceCur.close();
        return tasks;
    }

    /**
     * 构建生成id或es的信息元素
     *
     * @param categoryObj
     * @param queryField
     * @param retureField
     * @return
     */
    public static List<String> extractBuilderEles(DBObject categoryObj, String queryField, String retureField) {
        List<String> builerEles = Lists.newArrayList();
        JSONArray dataItemArr = null;
        try {
            dataItemArr = JSON.parseArray(categoryObj.get("dataItems").toString());
        } catch (JSONException e) {
        }
        if (dataItemArr != null && dataItemArr.size() > 0) {
            JSONObject dataItemJson = null;
            for (int i = 0; i < dataItemArr.size(); i++) {
                dataItemJson = dataItemArr.getJSONObject(i);
                if (dataItemJson.containsKey(queryField) && dataItemJson.getBoolean(queryField)) {
                    builerEles.add(dataItemJson.getString(retureField).trim());
                }
            }
        }
        return builerEles;
    }

   
    /**
     * 基于业务表中的信息构造定时任务表达式
     *
     * @param sourceObj
     * @return
     */
    public static String generateCronInfo(DBObject sourceObj) {
        String updateTimeType = "";
        String updateTimeCycle = "";
        if (sourceObj.containsField("updateType")) {
            updateTimeType = sourceObj.get("updateType").toString();
        }
        if (sourceObj.containsField("updateCycle")) {
            updateTimeCycle = sourceObj.get("updateCycle").toString();
        }
        if (updateTimeType.trim().length() == 0 || updateTimeCycle.trim().length() == 0) {
            return "";
        }

        StringBuilder sb = new StringBuilder();
        Date date = null;
        if (updateTimeType.equalsIgnoreCase("YEAR")) {
            date = TimeUtil.parseDate(updateTimeCycle, "MM-dd HH:mm");
            if (date == null) {
                try {
                    sb.append(TimeUtil.extractFixedTimeByDay(Integer.parseInt(updateTimeCycle), 0, 0));
                } catch (NumberFormatException e) {
                }
            } else {
                sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")
                        .append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ")
                        .append(TimeUtil.extractFixedTime(date, Calendar.DATE)).append(" ")
                        .append(TimeUtil.extractFixedTime(date, Calendar.MONTH) + 1).append(" ? *");
            }
        }

        if (updateTimeType.equalsIgnoreCase("MONTH")) {
            date = TimeUtil.parseDate(updateTimeCycle, "dd HH:mm");
            if (date == null) return "";
            sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")
                    .append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ")
                    .append(TimeUtil.extractFixedTime(date, Calendar.DATE)).append(" * ?");
        }

        if (updateTimeType.equalsIgnoreCase("DAY")) {
            date = TimeUtil.parseDate(updateTimeCycle, "HH:mm");
            if (date == null) return "";
            sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")
                    .append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" * * ?");
        }

        if (updateTimeType.equalsIgnoreCase("WEEK")) {
            String weekDay = "1";
            if (sourceObj.containsField("weekDay")) {
                weekDay = sourceObj.get("weekDay").toString();
            }
            date = TimeUtil.parseDate(updateTimeCycle, "HH:mm");
            if (date == null) return "";
            sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")
                    .append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ? * ")
                    .append(TimeUtil.extractFixedTime(weekDay));
        }

        if (updateTimeType.equalsIgnoreCase("HOUR")) {
            try {
                int hour = Integer.parseInt(updateTimeCycle);
                sb.append(TimeUtil.extractFixedTimeByHour(hour, 0));
            } catch (NumberFormatException e) {
            }
        }

        if (updateTimeType.equalsIgnoreCase("MINUTE")) {
            try {
                int minute = Integer.parseInt(updateTimeCycle);
                sb.append(TimeUtil.extractFixedTimeByMinute(minute));
            } catch (NumberFormatException e) {
            }
        }

        if (updateTimeType.equalsIgnoreCase("SECOND")) {
            sb.append("*/").append(updateTimeCycle).append(" * * * * ?");
        }

        return sb.toString();
    }

    /**
     * 过滤下线的任务
     *
     * @param tasks
     * @return
     */
    public static List<TaskInfo> filterTask(List<TaskInfo> tasks) {
        List<TaskInfo> taskInfos = Lists.newArrayList();
        for (TaskInfo taskInfo : tasks) {
            // 过滤下线的信源状态或实时的信源
            if (taskInfo.getSourceStatus() == StatusType.OFFLINE
                    || taskInfo.getSourceType() != Sou

以上是关于Quartz使用 - Quartz项目实战的主要内容,如果未能解决你的问题,请参考以下文章

Quartz实战源码解析Quartz分布式集群实现

Quartz实战源码解析Quartz分布式集群实现

分布式定时任务调度框架Quartz学习与实战记录完整篇

分布式定时任务调度框架Quartz学习与实战记录完整篇

分布式定时任务调度框架Quartz学习与实战记录完整篇

分布式定时任务调度框架Quartz学习与实战记录完整篇