基于zookeeper和quartz实现分布式定时调度
Posted JAVA高级架构
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于zookeeper和quartz实现分布式定时调度相关的知识,希望对你有一定的参考价值。
目的
利用zookeeper的特性,来控制quartz实现分布式调度,保证quartz的单点运行,同时解除quartz自身分布式部署对数据库的依赖,保证同一时刻只有一个quartz应用在执行任务。
实现方式
利用zk的分布式独占锁,控制quartz应用执行节点,让拿到独占锁的quartz应用执行调度,没有拿到独占锁的quartz处理等待状态。
类图
核心代码
public class TriggerBean {
/**
* 标识
*/
private String key;
/**
* 所属组
*/
private String group;
/**
* 描述
*/
private String description;
/**
* 启动时间
*/
private String startTime;
/**
* 结束时间
*/
private String endTime;
/**
* 优先级
*/
private Integer priority;
/**
* 日历名称
*/
private String calendarName;
/**
* 失火指令(参数0,1,2)
* MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1
* MISFIRE_INSTRUCTION_SMART_POLICY = 0 (默认)
* MISFIRE_INSTRUCTION_FIRE_ONCE_NOW = 1
* MISFIRE_INSTRUCTION_DO_NOTHING = 2
*/
private Integer misfireInstruction;
/**
* 任务代理类
*/
private JobDetailProxyBean jobDetail;
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getStartTime() {
return startTime;
}
public void setStartTime(String startTime) {
this.startTime = startTime;
}
public String getEndTime() {
return endTime;
}
public void setEndTime(String endTime) {
this.endTime = endTime;
}
public Integer getPriority() {
return priority;
}
public void setPriority(Integer priority) {
this.priority = priority;
}
public String getCalendarName() {
return calendarName;
}
public void setCalendarName(String calendarName) {
this.calendarName = calendarName;
}
public Integer getMisfireInstruction() {
return misfireInstruction;
}
public void setMisfireInstruction(Integer misfireInstruction) {
this.misfireInstruction = misfireInstruction;
}
public JobDetailProxyBean getJobDetail() {
return jobDetail;
}
public void setJobDetail(JobDetailProxyBean jobDetail) {
this.jobDetail = jobDetail;
}
}
public class CronTriggerBean extends TriggerBean {
/**
* CRON表达式
*/
private String cronExpression;
public String getCronExpression() {
return cronExpression;
}
public void setCronExpression(String cronExpression) {
this.cronExpression = cronExpression;
}
}
public class SimpleTriggerBean extends TriggerBean {
/**
* 时间间隔(秒)
*/
private Integer interval;
/**
* 重复次数(默认:-1为无限循环)
*/
private Integer repeatCount;
public Integer getInterval() {
return interval;
}
public void setInterval(Integer interval) {
this.interval = interval;
}
public Integer getRepeatCount() {
return repeatCount;
}
public void setRepeatCount(Integer repeatCount) {
this.repeatCount = repeatCount;
}
}
public class SchedulerFactoryBean implements InitializingBean {
protected static Logger logger = Logger.getLogger(SchedulerFactoryBean.class);
/**
* 触发器列表
*/
private List<Object> triggers;
/**
* zooKeeper工厂
*/
private ZookeeperFactory zooKeeperFactory;
/**
* Spring初始化方法
* @throws SchedulerException
*/
public void afterPropertiesSet() throws SchedulerException {
this.initSchedulerFactory();
}
/**
* 初始化调度器工厂
* @throws SchedulerException
*/
public void initSchedulerFactory() throws SchedulerException {
//初始化StdSchedulerFactory
StdSchedulerFactory schedulerFactory = SchedulerUtils.initStdSchedulerFactory();
//获取调度器
Scheduler scheduler = schedulerFactory.getScheduler();
//装载调度器
for(Object triggerObject : this.getTriggers()){
if(triggerObject instanceof CronTriggerBean){
CronTriggerBean cronTriggerBean = (CronTriggerBean)triggerObject;
//获取任务代理类对象
JobDetailProxyBean jobDetailProxyBean = cronTriggerBean.getJobDetail();
//装配任务
JobDetail jobDetail = SchedulerUtils.assemblyJobDetail(jobDetailProxyBean);
//设置zooKeeper连接工厂
jobDetail.getJobDataMap().put("zooKeeperFactory",this.getZooKeeperFactory());
//装配触发器
CronTrigger cronTrigger = SchedulerUtils.assemblyCronTrigger(cronTriggerBean);
scheduler.scheduleJob(jobDetail, cronTrigger);
// System.out.println("CronTriggerBean");
}else{
SimpleTriggerBean simpleTriggerBean = (SimpleTriggerBean)triggerObject;
//获取任务代理类对象
JobDetailProxyBean jobDetailProxyBean = simpleTriggerBean.getJobDetail();
//装配任务
JobDetail jobDetail = SchedulerUtils.assemblyJobDetail(jobDetailProxyBean);
//设置zooKeeper连接工厂
jobDetail.getJobDataMap().put("zooKeeperFactory",this.getZooKeeperFactory());
//装配触发器
SimpleTrigger simpleTrigger = SchedulerUtils.assemblySimpleTrigger(simpleTriggerBean);
scheduler.scheduleJob(jobDetail, simpleTrigger);
// System.out.println("SimpleTriggerBean");
}
}
scheduler.start();
logger.info("调度器已启动");
}
public List<Object> getTriggers() {
return triggers;
}
public void setTriggers(List<Object> triggers) {
this.triggers = triggers;
}
public ZookeeperFactory getZooKeeperFactory() {
return zooKeeperFactory;
}
public void setZooKeeperFactory(ZookeeperFactory zooKeeperFactory) {
this.zooKeeperFactory = zooKeeperFactory;
}
}
package com.ab.scheduling.quartz;
import com.ab.scheduling.quartz.constant.Constant;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.springframework.beans.factory.InitializingBean;
import java.util.Collections;
import java.util.List;
/**
* Zookeeper 工厂类
* Date: 14-4-2
* Time: 下午4:03
*/
public class ZookeeperFactory implements InitializingBean{
public static Logger logger = Logger.getLogger(ZookeeperFactory.class);
/**
* zookeeper服务地址
*/
private String hosts;
/**
* 回话的超时时间(毫秒)
*/
private Integer sessionTimeOut;
/**
* 连接的超时时间(毫秒)
*/
private Integer connectionTimeOut;
/**
* 命名空间
**/
private String nameSpace;
/**
* zookeeper管理对象
*/
private CuratorFramework zkTools;
/**
* 独享队列节点
*/
private String monopolyQueueNode;
/**
* 连接状态
*/
private String connectionState;
/**
* 会话ID
*/
private long sessionId;
/**
* Spring初始化方法
*/
public void afterPropertiesSet(){
this.connection();
this.addListener();
}
/**
* 连接
*/
public void connection(){
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, Integer.MAX_VALUE);
zkTools = CuratorFrameworkFactory
.builder()
.connectString(hosts)
.namespace(nameSpace)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(connectionTimeOut == null ? 30000 : connectionTimeOut)
.sessionTimeoutMs(sessionTimeOut == null ? 30000 : sessionTimeOut)
.build();
zkTools.start();
}
/**
* 连接状态监听
*/
public void addListener(){
zkTools.getConnectionStateListenable().addListener(new ConnectionStateListener() {
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState.equals(ConnectionState.CONNECTED)) {
logger.info("连接");
connectionState = "CONNECTED";
try {
sessionId = zkTools.getZookeeperClient().getZooKeeper().getSessionId();
registerMonopolyQueue();
} catch (Exception e) {
logger.error("注册独占队列失败");
}
}
if (newState.equals(ConnectionState.RECONNECTED)) {
logger.info("重新连接");
connectionState = "CONNECTED";
try {
if(sessionId != zkTools.getZookeeperClient().getZooKeeper().getSessionId()) {
registerMonopolyQueue();
}
} catch (Exception e) {
logger.error("注册独占队列失败");
}
}
if (newState.equals(ConnectionState.LOST)) {
logger.info("丢失");
connectionState = "LOST";
}
if (newState.equals(ConnectionState.SUSPENDED)) {
logger.info("暂停");
connectionState = "SUSPENDED";
}
if (newState.equals(ConnectionState.READ_ONLY)) {
logger.info("只读");
connectionState = "READ_ONLY";
}
}
});
}
/**
* 注册独占队列
*/
private void registerMonopolyQueue() throws Exception {
if(zkTools.checkExists().watched().forPath(Constant.MONOPOLY) == null){
zkTools.create()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(Constant.MONOPOLY);
logger.info("创建独享锁队列节点成功!");
}
if(monopolyQueueNode == null || (monopolyQueueNode != null && zkTools.checkExists().forPath(monopolyQueueNode)==null)) {
monopolyQueueNode = zkTools.create()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(Constant.MONOPOLY + Constant.SEPARATOR + Constant.QUEUE_NODE);
logger.info("成功加入独享锁队列");
}
}
/**
* 获得独占锁的执行权限
* @return 执行权限标识
* @throws KeeperException
* @throws InterruptedException
*/
public boolean getMonopolyLock() throws Exception {
boolean flag = false;
if(connectionState != null && (connectionState.equals("CONNECTED") || connectionState.equals("RECONNECTED"))){
List<String> nodes = zkTools.getChildren().watched().forPath(Constant.MONOPOLY);
if(nodes.size() > 0){
Collections.sort(nodes);
//判断当前应用是否在队列的第一位
if((Constant.SEPARATOR + Constant.MONOPOLY + Constant.SEPARATOR + nodes.get(0)).equals(monopolyQueueNode)){
flag = true;
}
}
}
return flag;
}
/**
* 关闭连接
*/
public void close(){
if(zkTools != null){
zkTools.close();
zkTools = null;
}
}
public String getHosts() {
return hosts;
}
public void setHosts(String hosts) {
this.hosts = hosts;
}
public Integer getSessionTimeOut() {
return sessionTimeOut;
}
public void setSessionTimeOut(Integer sessionTimeOut) {
this.sessionTimeOut = sessionTimeOut;
}
public Integer getConnectionTimeOut() {
return connectionTimeOut;
}
public void setConnectionTimeOut(Integer connectionTimeOut) {
this.connectionTimeOut = connectionTimeOut;
}
public String getNameSpace() {
return nameSpace;
}
public void setNameSpace(String nameSpace) {
this.nameSpace = nameSpace;
}
}
package com.ab.scheduling.quartz.common;
import com.ab.scheduling.quartz.JobDetailProxyBean;
import com.ab.scheduling.quartz.CronTriggerBean;
import com.ab.scheduling.quartz.SimpleTriggerBean;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.simpl.SimpleThreadPool;
import java.util.Properties;
/**
* Quartz调度工具类
* Date: 14-5-15
* Time: 下午6:10
*/
public class SchedulerUtils {
protected static Logger logger = Logger.getLogger(SchedulerUtils.class);
/**
* 初始化StdSchedulerFactory
* @return StdSchedulerFactory
*/
public static StdSchedulerFactory initStdSchedulerFactory() {
StdSchedulerFactory schedulerFactory = null;
try{
schedulerFactory = (StdSchedulerFactory) Class.forName(StdSchedulerFactory.class.getName()).newInstance();
Properties mergedProps = new Properties();
// 设置Quartz线程池设置
mergedProps.setProperty(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
mergedProps.setProperty("org.quartz.threadPool.threadCount", Integer.toString(10));
schedulerFactory.initialize(mergedProps);
} catch (Exception e){
logger.error("初始化StdSchedulerFactory失败");
logger.error(e);
}
return schedulerFactory;
}
/**
* 装配任务
* @param jobDetail 任务代理类
* @return JobDetail
*/
public static JobDetail assemblyJobDetail(JobDetailProxyBean jobDetail){
JobBuilder jobBuilder = JobBuilder.newJob(jobDetail.getClass());
//设置JobDetail身份标识与所属组
String key = jobDetail.getKey();
if(StringUtils.isNotBlank(key)){
jobBuilder = jobBuilder.withIdentity(key, jobDetail.getGroup());
}else{
jobBuilder = jobBuilder.withIdentity(IdentityUtils.generatorUUID("JOB"), jobDetail.getGroup());
}
//设置任务描述
if(StringUtils.isNotBlank(jobDetail.getDescription())){
jobBuilder = jobBuilder.withDescription(jobDetail.getDescription());
}
//设置JobDetail数据参数
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("targetObject",jobDetail.getTargetObject()); //目标对象
jobDataMap.put("targetMethod",jobDetail.getTargetMethod()); //目标方法
jobDataMap.put("mode", jobDetail.getMode()); //运行模式
jobBuilder = jobBuilder.usingJobData(jobDataMap);
return jobBuilder.build();
}
/**
* 装配表达式触发器
* @param cronTriggerBean 表达式触发器
* @return 表达式触发器
*/
public static CronTrigger assemblyCronTrigger(CronTriggerBean cronTriggerBean){
TriggerBuilder triggerBuilder = TriggerBuilder.newTrigger();
//设置触发器身份标识与所属组
String key = cronTriggerBean.getKey();
if(StringUtils.isNotBlank(key)){
triggerBuilder = triggerBuilder.withIdentity(key, cronTriggerBean.getGroup());
}else{
triggerBuilder = triggerBuilder.withIdentity(IdentityUtils.generatorUUID("CronTrigger"), cronTriggerBean.getGroup());
}
//设置描述
if(StringUtils.isNotBlank(cronTriggerBean.getDescription())){
triggerBuilder = triggerBuilder.withDescription(cronTriggerBean.getDescription());
}
//设置启动时间
if(StringUtils.isNotBlank(cronTriggerBean.getStartTime())){
triggerBuilder = triggerBuilder.startAt(DateUtils.StringToDate(cronTriggerBean.getStartTime(), "yyyy-MM-dd HH:mm:ss"));
}else{
triggerBuilder = triggerBuilder.startNow(); //当启动时间为空默认立即启动调度器
}
//设置结束时间
if(StringUtils.isNotBlank(cronTriggerBean.getEndTime())){
triggerBuilder = triggerBuilder.endAt(DateUtils.StringToDate(cronTriggerBean.getEndTime(), "yyyy-MM-dd HH:mm:ss"));
}
//设置优先级
if(cronTriggerBean.getPriority() != null){
triggerBuilder = triggerBuilder.withPriority(cronTriggerBean.getPriority());
}
//设置Cron表达式(不允许为空)与集火指令
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronTriggerBean.getCronExpression());
if(cronTriggerBean.getMisfireInstruction() != null){
if(cronTriggerBean.getMisfireInstruction() == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY) {
cronScheduleBuilder = cronScheduleBuilder.withMisfireHandlingInstructionIgnoreMisfires();
}
if(cronTriggerBean.getMisfireInstruction() == CronTrigger.MISFIRE_INSTRUCTION_FIRE_ONCE_NOW) {
cronScheduleBuilder = cronScheduleBuilder.withMisfireHandlingInstructionFireAndProceed();
}
if(cronTriggerBean.getMisfireInstruction() == CronTrigger.MISFIRE_INSTRUCTION_DO_NOTHING) {
cronScheduleBuilder = cronScheduleBuilder.withMisfireHandlingInstructionDoNothing();
}
}
triggerBuilder = triggerBuilder.withSchedule(cronScheduleBuilder);
return (CronTrigger)triggerBuilder.build();
}
/**
* 装配简单触发器
* @param simpleTriggerBean 简单触发器
* @return 简单触发器
*/
public static SimpleTrigger assemblySimpleTrigger(SimpleTriggerBean simpleTriggerBean){
TriggerBuilder triggerBuilder = TriggerBuilder.newTrigger();
//设置触发器身份标识与所属组
String key = simpleTriggerBean.getKey();
if(StringUtils.isNotBlank(key)){
triggerBuilder = triggerBuilder.withIdentity(key, simpleTriggerBean.getGroup());
}else{
triggerBuilder = triggerBuilder.withIdentity(IdentityUtils.generatorUUID("SimpleTrigger"), simpleTriggerBean.getGroup());
}
//设置描述
if(StringUtils.isNotBlank(simpleTriggerBean.getDescription())){
triggerBuilder = triggerBuilder.withDescription(simpleTriggerBean.getDescription());
}
//设置启动时间
if(StringUtils.isNotBlank(simpleTriggerBean.getStartTime())){
triggerBuilder = triggerBuilder.startAt(DateUtils.StringToDate(simpleTriggerBean.getStartTime(), "yyyy-MM-dd HH:mm:ss"));
}else{
triggerBuilder = triggerBuilder.startNow(); //当启动时间为空默认立即启动调度器
}
//设置结束时间
if(StringUtils.isNotBlank(simpleTriggerBean.getEndTime())){
triggerBuilder = triggerBuilder.endAt(DateUtils.StringToDate(simpleTriggerBean.getEndTime(), "yyyy-MM-dd HH:mm:ss"));
}
//设置优先级
if(simpleTriggerBean.getPriority() != null){
triggerBuilder = triggerBuilder.withPriority(simpleTriggerBean.getPriority());
}
//设置简单触发器 时间间隔(不允许为空)、执行次数(默认为-1)与集火指令
SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(20).withRepeatCount(-1);
simpleScheduleBuilder = simpleScheduleBuilder.withIntervalInSeconds(simpleTriggerBean.getInterval());
if(simpleTriggerBean.getRepeatCount() != null){
simpleScheduleBuilder = simpleScheduleBuilder.withRepeatCount(simpleTriggerBean.getRepeatCount());
}else{
simpleScheduleBuilder = simpleScheduleBuilder.withRepeatCount(-1);
}
if(simpleTriggerBean.getMisfireInstruction() != null){
if(simpleTriggerBean.getMisfireInstruction() == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY) {
simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionIgnoreMisfires();
}
if(simpleTriggerBean.getMisfireInstruction() == SimpleTrigger.MISFIRE_INSTRUCTION_FIRE_NOW) {
simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionFireNow();
}
if(simpleTriggerBean.getMisfireInstruction() == SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_EXISTING_COUNT) {
simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionNextWithExistingCount();
}
if(simpleTriggerBean.getMisfireInstruction() == SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT) {
simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionNextWithRemainingCount();
}
if(simpleTriggerBean.getMisfireInstruction() == SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT) {
simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionNowWithExistingCount();
}
if(simpleTriggerBean.getMisfireInstruction() == SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT) {
simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionNowWithRemainingCount();
}
}
triggerBuilder = triggerBuilder.withSchedule(simpleScheduleBuilder);
return (SimpleTrigger)triggerBuilder.build();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"
default-autowire="byName">
<!--定时任务实现类-->
<bean id="test1" class="com.jd.scheduling.quartz.test.Test1"/>
<!--任务代理-->
<bean id="jobDetail1" class="com.ab.scheduling.quartz.JobDetailProxyBean">
<property name="targetObject" ref="test1"/>
<property name="targetMethod" value="test"/>
</bean>
<!--触发器-->
<bean id="cronTrigger" class="com.ab.scheduling.quartz.CronTriggerBean">
<property name="jobDetail" ref="jobDetail1"/>
<property name="cronExpression" value="0/10 * * * * ?"/>
</bean>
<!--zk配置-->
<bean id="zooKeeperFactory" class="com.ab.scheduling.quartz.ZookeeperFactory">
<property name="hosts" value="127.0.0.1:2181"/>
<property name="sessionTimeOut" value="15000"/>
<property name="nameSpace" value="zk-scheduling"/>
</bean>
<!--调度工厂-->
<bean id="schdulerFactory" autowire="no" class="com.ab.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<list>
<ref bean="cronTrigger" />
</list>
</property>
<property name="zooKeeperFactory" ref="zooKeeperFactory"/>
</bean>
</beans>
温馨提示
以上是关于基于zookeeper和quartz实现分布式定时调度的主要内容,如果未能解决你的问题,请参考以下文章