Quartz使用 - Quartz项目实战
Posted mengrennwpu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Quartz使用 - Quartz项目实战相关的知识,希望对你有一定的参考价值。
本片博文将阐述项目工作中使用Quartz的情况,包含项目背景、项目框架、Quartz集群部署等方面,重点讲述如何在实际项目中使用Quartz。
1. 背景
因项目需求,需要定时调用数据下载接口,并将数据存储至诸如mongo、redis、elasticsearch等数据库或缓存中。具体涉及到的需求如下:
a. 调用接口的任务均从mongo数据库读取;
b. 任务的个数随着业务量的增加而增加;
c. 每个调用任务的定时执行时间可能不同,且定时执行时间在mongo中可配置;
d. 任务的执行需要动态更新,如检测到某一任务的定时时间发生变化,则任务的执行也需要实时修改
e. mongo、redis、elasticsearch等数据库中所存储的字段也由mongo进行配置;
f. 任务执行需要实时性较高、可靠性较强、可扩展性较高等
综上需求,调研了一番,发现任务调度框架Quartz可满足项目需求。
2. 框架
基于项目的需求,结合任务调度框架Quartz,大体的流程框架如下图所示:
1) 首先构建从mongo加载任务
2) 将任务的配置信息初始化至Quartz
3) 通过Quartz的Job任务实现定时调用下载接口任务
4) 将下载的数据依据配置,存储至数据库中
5) 定时检测任务通过定时扫描mongo数据库,查看相关任务信息的配置是否发生变化,如果发生变化,则进行动态更新
6) 为了实现高可用性、可扩展性,可以直接使用Quartz原生的集群特性。
3. 核心代码
核心代码将会涵盖上述流程图中的相关环节,为了项目的保密性,相关信息也会隐藏。
3.1 任务主流程
import com.quartz.conf.Configuration; import com.quartz.conf.OcpConfHelper; import com.quartz.module.TaskInfo; import org.apache.log4j.PropertyConfigurator; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; public class SchedulerRunner { static Logger logger = LoggerFactory.getLogger(SchedulerRunner.class); public static void main(String[] args) { // 加载日志配置文件 PropertyConfigurator.configure("./conf/log4j.properties"); // 加载quartz配置文件 System.setProperty("org.quartz.properties", "./conf/quartz.properties"); // 执行任务解析与调度 run(); } public static void run(){ // 获取配置信息表 List<TaskInfo> taskInfos = GenerateTaskInfo.generateTaskInfoFrommysql(); if(taskInfos.size() == 0){ logger.info("there is no tasks from mongoInfo"); return; } // 过滤下线任务 taskInfos = GenerateTaskInfo.filterTask(taskInfos); if(taskInfos.size() == 0){ logger.info("all tasks if offline, no need to run"); return; } Scheduler scheduler = null; try { scheduler = StdSchedulerFactory.getDefaultScheduler(); } catch (SchedulerException e) { e.printStackTrace(); } if(scheduler == null){ logger.error("create scheduler failed"); return; } if(isSchedulerClear()){ clearSchedulerJob(scheduler); } // 加入任务调度 for(TaskInfo task : taskInfos){ SchedulerFactory.addJob2Scheduler(task, scheduler); } // 加入动态更新任务 SchedulerFactory.addDynamicUpdateJob2Scheduler(scheduler); // 开启任务 try { scheduler.start(); } catch (SchedulerException e) { logger.error("start scheduler error!"); } } public static void clearSchedulerJob(Scheduler scheduler){ try { scheduler.clear(); } catch (SchedulerException e) { logger.error("clear scheduler error!"); } } /** * 基于配置文件中的信息,加载调度器开始运行时的清洗标识 * @return */ private static boolean isSchedulerClear(){ Configuration conf = OcpConfHelper.getInstance().getOcpConf(); return conf.getBooleanValue("cleanSchedulerFlag", "true"); } }
3.2 封装任务对象
import java.util.List; import java.util.Map; public class TaskInfo { protected String categoryId; // 业务Id protected String categoryName; // 业务名称 protected String sourceId; // 信源Id protected String sourceName; // 信源名称 protected int sourceStatus; // 信源状态 protected String pipelineConf; // 信源pipeline配置信息 protected List<String> dbStoreTypes; // 业务的存储类型 protected String esConfInfo; // ES存储配置 protected String dbConfInfo; // DB存储配置 protected String cronInfo; // 定时任务信息 protected int sourceType; // 实时更新还是离线更新 protected List<String> indexBuildEles; // 更新索引的信息 protected List<String> idBuildEles; // id的构建因素 protected String indexType; // 全量或增量 protected String categoryLevel1; // 一级分类 protected String zhName; // 中文信息 protected Map<String,String> outputType; //输出参数名及其类型 protected String providerName; protected String functionName; //category_function名称 public String getProviderName() { return providerName; } public void setProviderName(String providerName) { this.providerName = providerName; } public String getCategoryId() { return categoryId; } public void setCategoryId(String categoryId) { this.categoryId = categoryId; } public String getCategoryName() { return categoryName; } public void setCategoryName(String categoryName) { this.categoryName = categoryName; } public String getSourceId() { return sourceId; } public void setSourceId(String sourceId) { this.sourceId = sourceId; } public String getSourceName() { return sourceName; } public void setSourceName(String sourceName) { this.sourceName = sourceName; } public int getSourceStatus() { return sourceStatus; } public void setSourceStatus(int sourceStatus) { this.sourceStatus = sourceStatus; } public String getPipelineConf() { return pipelineConf; } public void setPipelineConf(String pipelineConf) { this.pipelineConf = pipelineConf; } public String getEsConfInfo() { return esConfInfo; } public void setEsConfInfo(String esConfInfo) { this.esConfInfo = esConfInfo; } public String getDbConfInfo() { return dbConfInfo; } public void setDbConfInfo(String dbConfInfo) { this.dbConfInfo = dbConfInfo; } public String getCronInfo() { return cronInfo; } public void setCronInfo(String cronInfo) { this.cronInfo = cronInfo; } public int getSourceType() { return sourceType; } public void setSourceType(int sourceType) { this.sourceType = sourceType; } public List<String> getIdBuildEles() { return idBuildEles; } public void setIdBuildEles(List<String> idBuildEles) { this.idBuildEles = idBuildEles; } public List<String> getIndexBuildEles() { return indexBuildEles; } public void setIndexBuildEles(List<String> indexBuildEles) { this.indexBuildEles = indexBuildEles; } public String getIndexType() { return indexType; } public void setIndexType(String indexType) { this.indexType = indexType; } public String getCategoryLevel1() { return categoryLevel1; } public void setCategoryLevel1(String categoryLevel1) { this.categoryLevel1 = categoryLevel1; } public String getZhName() { return zhName; } public void setZhName(String zhName) { this.zhName = zhName; } public TaskInfo(){} public List<String> getDbStoreTypes() { return dbStoreTypes; } public void setDbStoreTypes(List<String> dbStoreTypes) { this.dbStoreTypes = dbStoreTypes; } public Map<String, String> getOutputType() { return outputType; } public void setOutputType(Map<String, String> outputType) { this.outputType = outputType; } public String getFunctionName() { return functionName; } public void setFunctionName(String functionName) { this.functionName = functionName; } /** * 是否有相同的定时信息 * @param taskInfo * @return */ public boolean hasSameCronInfo(TaskInfo taskInfo){ if(taskInfo == null) return false; return this.getCronInfo().equalsIgnoreCase(taskInfo.getCronInfo()); } @Override public String toString() { return "TaskInfo{" + "categoryId=\'" + categoryId + \'\\\'\' + ", categoryName=\'" + categoryName + \'\\\'\' + ", sourceId=\'" + sourceId + \'\\\'\' + ", sourceName=\'" + sourceName + \'\\\'\' + ", sourceStatus=" + sourceStatus + ", pipelineConf=\'" + pipelineConf + \'\\\'\' + ", dbStoreTypes=" + dbStoreTypes + ", esConfInfo=\'" + esConfInfo + \'\\\'\' + ", dbConfInfo=\'" + dbConfInfo + \'\\\'\' + ", cronInfo=\'" + cronInfo + \'\\\'\' + ", sourceType=" + sourceType + ", indexBuildEles=" + indexBuildEles + ", idBuildEles=" + idBuildEles + ", indexType=\'" + indexType + \'\\\'\' + ", categoryLevel1=\'" + categoryLevel1 + \'\\\'\' + ", zhName=\'" + zhName + \'\\\'\' + ", outputType=\'" + outputType + \'\\\'\' + ", providerName=\'" + providerName + \'\\\'\' + ", functionName=\'" + functionName + \'\\\'\' + \'}\'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; TaskInfo taskInfo = (TaskInfo) o; if (sourceStatus != taskInfo.sourceStatus) return false; if (sourceType != taskInfo.sourceType) return false; if (categoryName != null ? !categoryName.equals(taskInfo.categoryName) : taskInfo.categoryName != null) return false; if (sourceName != null ? !sourceName.equals(taskInfo.sourceName) : taskInfo.sourceName != null) return false; if (providerName != null ? !providerName.equals(taskInfo.providerName) : taskInfo.providerName != null) return false; if (pipelineConf != null ? !pipelineConf.equals(taskInfo.pipelineConf) : taskInfo.pipelineConf != null) return false; if (dbStoreTypes != null ? !dbStoreTypes.equals(taskInfo.dbStoreTypes) : taskInfo.dbStoreTypes != null) return false; if (esConfInfo != null ? !esConfInfo.equals(taskInfo.esConfInfo) : taskInfo.esConfInfo != null) return false; if (dbConfInfo != null ? !dbConfInfo.equals(taskInfo.dbConfInfo) : taskInfo.dbConfInfo != null) return false; if (cronInfo != null ? !cronInfo.equals(taskInfo.cronInfo) : taskInfo.cronInfo != null) return false; if (indexBuildEles != null ? !indexBuildEles.equals(taskInfo.indexBuildEles) : taskInfo.indexBuildEles != null) return false; if (idBuildEles != null ? !idBuildEles.equals(taskInfo.idBuildEles) : taskInfo.idBuildEles != null) return false; if (indexType != null ? !indexType.equals(taskInfo.indexType) : taskInfo.indexType != null) return false; if (categoryLevel1 != null ? !categoryLevel1.equals(taskInfo.categoryLevel1) : taskInfo.categoryLevel1 != null) return false; if (outputType != null ? !outputType.equals(taskInfo.outputType) : taskInfo.outputType != null) return false; if (functionName != null ? !functionName.equals(taskInfo.functionName) : taskInfo.functionName != null) return false; return zhName != null ? zhName.equals(taskInfo.zhName) : taskInfo.zhName == null; } @Override public int hashCode() { int result = categoryName != null ? categoryName.hashCode() : 0; result = 31 * result + (sourceName != null ? sourceName.hashCode() : 0); result = 31 * result + (providerName != null ? providerName.hashCode() : 0); result = 31 * result + sourceStatus; result = 31 * result + (pipelineConf != null ? pipelineConf.hashCode() : 0); result = 31 * result + (dbStoreTypes != null ? dbStoreTypes.hashCode() : 0); result = 31 * result + (esConfInfo != null ? esConfInfo.hashCode() : 0); result = 31 * result + (dbConfInfo != null ? dbConfInfo.hashCode() : 0); result = 31 * result + (cronInfo != null ? cronInfo.hashCode() : 0); result = 31 * result + sourceType; result = 31 * result + (indexBuildEles != null ? indexBuildEles.hashCode() : 0); result = 31 * result + (idBuildEles != null ? idBuildEles.hashCode() : 0); result = 31 * result + (indexType != null ? indexType.hashCode() : 0); result = 31 * result + (categoryLevel1 != null ? categoryLevel1.hashCode() : 0); result = 31 * result + (zhName != null ? zhName.hashCode() : 0); result = 31 * result + (outputType != null ? outputType.hashCode() : 0); result = 31 * result + (functionName != null ? functionName.hashCode() : 0); return result; } }
3.3 任务的构造及初始化
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; import com.quartz.consts.SourceType; import com.quartz.consts.Sql; import com.quartz.consts.StatusType; import com.quartz.module.TaskInfo; import com.quartz.util.MongoUtil; import com.quartz.util.MySqlUtil; import com.quartz.util.TimeUtil; import com.mongodb.BasicDBObject; import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject; import org.bson.types.ObjectId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.util.*; /** * 获取调度任务的相关信息 * Created by songwang4 on 2017/6/7. */ public class GenerateTaskInfo { static Logger logger = LoggerFactory.getLogger(GenerateTaskInfo.class); static DBCollection sourceColl = MongoUtil.createOcpSourceDB(); static DBCollection categoryColl = MongoUtil.createOcpCategoryDB(); /** * 从数据库中读取任务相关信息 * * @return */ public static List<TaskInfo> generateTaskInfoFromMongo() { // 将任务信息进行封装 List<TaskInfo> tasks = Lists.newArrayList(); TaskInfo task = null; DBCursor sourceCur = sourceColl.find(); DBObject sourceObj = null; DBObject categoryObj = null; while (sourceCur.hasNext()) { sourceObj = sourceCur.next(); task = new TaskInfo(); String sourceName = sourceObj.get("sourceName").toString(); String categoryName = sourceObj.get("category").toString(); // 基于业务名查找对应的业务表信息 categoryObj = categoryColl.findOne(new BasicDBObject("catName", categoryName)); if (categoryObj == null) { logger.error("no category found through source: " + sourceName); continue; } task.setCategoryId(categoryObj.get("_id").toString()); // 业务Id task.setCategoryName(categoryName); // 业务名 List<String> dbStoreTypes = Lists.newArrayList(); if (categoryObj.containsField("storeType")) { try { JSONArray storeTypeArr = JSON.parseArray(categoryObj.get("storeType").toString()); for (int i = 0; i < storeTypeArr.size(); i++) { dbStoreTypes.add(storeTypeArr.getString(i)); } } catch (Exception e) { } } task.setDbStoreTypes(dbStoreTypes); // 存储类型 task.setCategoryLevel1(categoryObj.get("parent").toString()); // 一级业务分类 task.setZhName(sourceObj.get("zhName").toString()); task.setDbConfInfo(categoryObj.containsField("db") ? categoryObj.get("db").toString() : categoryName); // DB配置 task.setEsConfInfo(categoryObj.containsField("es") ? categoryObj.get("es").toString() : categoryName); // ES配置 task.setIndexBuildEles(extractBuilderEles(categoryObj, "isIndex", "itemName")); // 构建ES索引信息 task.setIdBuildEles(extractBuilderEles(categoryObj, "isGK", "itemName")); // 构建id的信息元素 task.setSourceId(sourceObj.get("_id").toString()); // 信源Id task.setSourceName(sourceName); // 信源名称 int status = StatusType.OFFLINE; if (sourceObj.containsField("status")) { String statusType = sourceObj.get("status").toString(); if (statusType.equals(StatusType.STR_ONLINE)) { status = StatusType.ONLINE; } } task.setSourceStatus(status); // 信源的上下线状态 int sourceType = SourceType.REAL_TIME_PROCESS; if (sourceObj.containsField("type")) { String strStatusType = sourceObj.get("type").toString(); if (strStatusType.equals(SourceType.STR_OFF_LINE_PROCESS)) { sourceType = SourceType.OFF_LINE_PROCESS; } } task.setSourceType(sourceType); // 离线或实时处理 task.setIndexType(sourceObj.containsField("indexType") ? sourceObj.get("indexType").toString() : ""); // 增量或全量标识 // 定时时间配置 task.setCronInfo(sourceObj.containsField("timerInfo") ? sourceObj.get("timerInfo").toString() : ""); if (task.getCronInfo().trim().length() == 0) { task.setCronInfo(generateCronInfo(sourceObj)); } task.setPipelineConf(sourceObj.containsField("mappingWorkflow") ? sourceObj.get("mappingWorkflow").toString() : ""); // pipeline配置信息 tasks.add(task); } sourceCur.close(); return tasks; } /** * 构建生成id或es的信息元素 * * @param categoryObj * @param queryField * @param retureField * @return */ public static List<String> extractBuilderEles(DBObject categoryObj, String queryField, String retureField) { List<String> builerEles = Lists.newArrayList(); JSONArray dataItemArr = null; try { dataItemArr = JSON.parseArray(categoryObj.get("dataItems").toString()); } catch (JSONException e) { } if (dataItemArr != null && dataItemArr.size() > 0) { JSONObject dataItemJson = null; for (int i = 0; i < dataItemArr.size(); i++) { dataItemJson = dataItemArr.getJSONObject(i); if (dataItemJson.containsKey(queryField) && dataItemJson.getBoolean(queryField)) { builerEles.add(dataItemJson.getString(retureField).trim()); } } } return builerEles; } /** * 基于业务表中的信息构造定时任务表达式 * * @param sourceObj * @return */ public static String generateCronInfo(DBObject sourceObj) { String updateTimeType = ""; String updateTimeCycle = ""; if (sourceObj.containsField("updateType")) { updateTimeType = sourceObj.get("updateType").toString(); } if (sourceObj.containsField("updateCycle")) { updateTimeCycle = sourceObj.get("updateCycle").toString(); } if (updateTimeType.trim().length() == 0 || updateTimeCycle.trim().length() == 0) { return ""; } StringBuilder sb = new StringBuilder(); Date date = null; if (updateTimeType.equalsIgnoreCase("YEAR")) { date = TimeUtil.parseDate(updateTimeCycle, "MM-dd HH:mm"); if (date == null) { try { sb.append(TimeUtil.extractFixedTimeByDay(Integer.parseInt(updateTimeCycle), 0, 0)); } catch (NumberFormatException e) { } } else { sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ") .append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ") .append(TimeUtil.extractFixedTime(date, Calendar.DATE)).append(" ") .append(TimeUtil.extractFixedTime(date, Calendar.MONTH) + 1).append(" ? *"); } } if (updateTimeType.equalsIgnoreCase("MONTH")) { date = TimeUtil.parseDate(updateTimeCycle, "dd HH:mm"); if (date == null) return ""; sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ") .append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ") .append(TimeUtil.extractFixedTime(date, Calendar.DATE)).append(" * ?"); } if (updateTimeType.equalsIgnoreCase("DAY")) { date = TimeUtil.parseDate(updateTimeCycle, "HH:mm"); if (date == null) return ""; sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ") .append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" * * ?"); } if (updateTimeType.equalsIgnoreCase("WEEK")) { String weekDay = "1"; if (sourceObj.containsField("weekDay")) { weekDay = sourceObj.get("weekDay").toString(); } date = TimeUtil.parseDate(updateTimeCycle, "HH:mm"); if (date == null) return ""; sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ") .append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ? * ") .append(TimeUtil.extractFixedTime(weekDay)); } if (updateTimeType.equalsIgnoreCase("HOUR")) { try { int hour = Integer.parseInt(updateTimeCycle); sb.append(TimeUtil.extractFixedTimeByHour(hour, 0)); } catch (NumberFormatException e) { } } if (updateTimeType.equalsIgnoreCase("MINUTE")) { try { int minute = Integer.parseInt(updateTimeCycle); sb.append(TimeUtil.extractFixedTimeByMinute(minute)); } catch (NumberFormatException e) { } } if (updateTimeType.equalsIgnoreCase("SECOND")) { sb.append("*/").append(updateTimeCycle).append(" * * * * ?"); } return sb.toString(); } /** * 过滤下线的任务 * * @param tasks * @return */ public static List<TaskInfo> filterTask(List<TaskInfo> tasks) { List<TaskInfo> taskInfos = Lists.newArrayList(); for (TaskInfo taskInfo : tasks) { // 过滤下线的信源状态或实时的信源 if (taskInfo.getSourceStatus() == StatusType.OFFLINE || taskInfo.getSourceType() != Sou以上是关于Quartz使用 - Quartz项目实战的主要内容,如果未能解决你的问题,请参考以下文章