ElasticJob源码分析--定时任务执行JobScheduler类分析
Posted 低调的洋仔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticJob源码分析--定时任务执行JobScheduler类分析相关的知识,希望对你有一定的参考价值。
public static void main(String[] args)
// 初始化数据源
DataSource dataSource = MovieServiceUtils.getDataSource();
// 定义日志数据库事件溯源配置
JobEventConfiguration jobEventRdbConfig = new JobEventRdbConfiguration(dataSource);
new JobScheduler(createRegistryCenter(), createJobConfiguration(), jobEventRdbConfig).init();
JobScheduler执行过程
public JobScheduler(CoordinatorRegistryCenter regCenter, LiteJobConfiguration liteJobConfig, JobEventConfiguration jobEventConfig, ElasticJobListener... elasticJobListeners)
this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
private JobScheduler(CoordinatorRegistryCenter regCenter, LiteJobConfiguration liteJobConfig, JobEventBus jobEventBus, ElasticJobListener... elasticJobListeners)
JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
this.liteJobConfig = liteJobConfig;
this.regCenter = regCenter;
List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
this.setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
this.schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
this.jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
这里创建的是JobScheduler这个对象,然后下面调用了init方法。
public void init()
LiteJobConfiguration liteJobConfigFromRegCenter = this.schedulerFacade.updateJobConfiguration(this.liteJobConfig);
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
JobScheduleController jobScheduleController = new JobScheduleController(this.createScheduler(), this.createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, this.regCenter);
this.schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
初始化的几个动作:
--初始化设置分片总量,这些参数都是通过配置中心中获取到的,也就是LiteJobConfiguration这个类。
--创建调度器,createScheduler方法。
--创建任务详情,createJobdDetail方法来创建。
--注册任务到ZK上面。
--注册作业启动信息registerStartUpInfo方法。
--进行作业调度scheduleJob方法。
createScheduler方法
这里创建的就是Quartz里面的scheduler。
然后这个方法里面还添加了任务触发监听器。
private Scheduler createScheduler()
try
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(this.getBaseQuartzProperties());
Scheduler result = factory.getScheduler();
result.getListenerManager().addTriggerListener(this.schedulerFacade.newJobTriggerListener());
return result;
catch (SchedulerException var3)
throw new JobSystemException(var3);
factory这个地方调用了getScheduler这个方法,然后这个方法其实调用了的是Quartz中的方法。
然后上面的initialize方法是调用了一个getBaseQuartzProperties方法,在这个方法里面配置了相关的参数。
private Properties getBaseQuartzProperties()
Properties result = new Properties();
result.put("org.quartz.threadPool.class", SimpleThreadPool.class.getName());
result.put("org.quartz.threadPool.threadCount", "1");
result.put("org.quartz.scheduler.instanceName", this.liteJobConfig.getJobName());
result.put("org.quartz.jobStore.misfireThreshold", "1");
result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
return result;
getSchedule方法调用的Quartz中的Scheduler中的方法。
Scheduler在使用之前需要进行实例化,从SchedulerFactory创建它开始,到scheduler调用shutdown结束。
他在实例化之后,可以启动(start)、暂停(standby)、停止(shutdown),然后只有start的scheduler才能被触发(trigger).
public Scheduler getScheduler() throws SchedulerException
if (this.cfg == null)
this.initialize();
SchedulerRepository schedRep = SchedulerRepository.getInstance();
Scheduler sched = schedRep.lookup(this.getSchedulerName());
if (sched != null)
if (!sched.isShutdown())
return sched;
schedRep.remove(this.getSchedulerName());
sched = this.instantiate();
return sched;
CreateJobDetail方法
private JobDetail createJobDetail(String jobClass)
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(this.liteJobConfig.getJobName()).build();
result.getJobDataMap().put("jobFacade", this.jobFacade);
Optional<ElasticJob> elasticJobInstance = this.createElasticJobInstance();
if (elasticJobInstance.isPresent())
result.getJobDataMap().put("elasticJob", elasticJobInstance.get());
else if (!jobClass.equals(ScriptJob.class.getCanonicalName()))
try
result.getJobDataMap().put("elasticJob", Class.forName(jobClass).newInstance());
catch (ReflectiveOperationException var5)
throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", new Object[]jobClass);
return result;
这个地方首先是创建了一个result实例,这个实例用于封装Job的详细信息的。
前面几个方法的连续调用基本上就是添加参数而已。然后这里是直接build,创建一个JobDetail的实例。
后面调用了一个createElasticJobInstance()方法,这个方法的话是创建一个实例,然后这个实例放在了Gava的Optional里面。
接下来调用这个isPresent方法来判断下是不是实例已经创建了,如果创建了的话,那么就直接将其封装进result的DataMap中去。
直接使用的是elasticjob的key.
public JobDetail build()
JobDetailImpl job = new JobDetailImpl();
job.setJobClass(this.jobClass);
job.setDescription(this.description);
if (this.key == null)
this.key = new JobKey(Key.createUniqueName((String)null), (String)null);
job.setKey(this.key);
job.setDurability(this.durability);
job.setRequestsRecovery(this.shouldRecover);
if (!this.jobDataMap.isEmpty())
job.setJobDataMap(this.jobDataMap);
return job;
创建了一个JobDetailImpl这个类的实例,这个实例封装了许多参数信息。而这个JobDetailImpl的类就是Quartz中的一个实现类。
public class JobDetailImpl implements Cloneable, Serializable, JobDetail
private static final long serialVersionUID = -6069784757781506897L;
private String name;
private String group;
private String description;
private Class<? extends Job> jobClass;
private JobDataMap jobDataMap;
private boolean durability;
private boolean shouldRecover;
private transient JobKey key;
这里面是一个jobDataMap的一个类,这个类实际上是一个实现了Map接口的类一路继承下来的。应该是ElasticJob自己进行了一个封装而已。
然后创建了一个
public JobScheduleController(Scheduler scheduler, JobDetail jobDetail, String triggerIdentity)
this.scheduler = scheduler;
this.jobDetail = jobDetail;
this.triggerIdentity = triggerIdentity;
RegisterJob
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, this.regCenter);
JobRegistry
public final class JobRegistry
private static volatile JobRegistry instance;
private Map<String, JobScheduleController> schedulerMap = new ConcurrentHashMap();
private Map<String, CoordinatorRegistryCenter> regCenterMap = new ConcurrentHashMap();
private Map<String, JobInstance> jobInstanceMap = new ConcurrentHashMap();
private Map<String, Boolean> jobRunningMap = new ConcurrentHashMap();
private Map<String, Integer> currentShardingTotalCountMap = new ConcurrentHashMap();
public static JobRegistry getInstance()
if (null == instance)
Class var0 = JobRegistry.class;
synchronized(JobRegistry.class)
if (null == instance)
instance = new JobRegistry();
return instance;
单例模式,采用了双重检查的方式创建了对象。
public void registerJob(String jobName, JobScheduleController jobScheduleController, CoordinatorRegistryCenter regCenter)
this.schedulerMap.put(jobName, jobScheduleController);
this.regCenterMap.put(jobName, regCenter);
regCenter.addCacheData("/" + jobName);
注册的过程。
schedulerMap和regCenterMap,就是把数据封装到了Map中去。也就是将需要的数据交给第三方暂存一下,用的时候取就是了。这部分应该是本地做了一个缓存。
下面的regCenter是将数据注册到远端的注册中心上去。
RegisterStartUpInfo
public void registerStartUpInfo(boolean enabled)
this.listenerManager.startAllListeners();//启动所有的监听器
this.leaderService.electLeader();// 选举leader
this.serverService.persistOnline(enabled);// 持久化作业信息到zk,先持久化服务器信息,然后再持久化作业信息。
this.instanceService.persistOnline();// 持久化作业信息
this.shardingService.setReshardingFlag();// 重新分片
this.monitorService.listen();// 初始化监听服务
if (!this.reconcileService.isRunning())
this.reconcileService.startAsync();// 调节分布式作业不一致状态服务异步启动。
1.启动所有的监听器
public void startAllListeners()
this.electionListenerManager.start();// 选举
this.shardingListenerManager.start();// 分片
this.failoverListenerManager.start();// 失效转移
this.monitorExecutionListenerManager.start();// 执行监控监听
this.shutdownListenerManager.start();// shutdown监听
this.triggerListenerManager.start();// 触发器监听
this.rescheduleListenerManager.start();// 重新调度
this.guaranteeListenerManager.start(); // 保证分布式任务全部开始和结束状态
this.jobNodeStorage.addConnectionStateListener(this.regCenterConnectionStateListener);// 注册中心与任务节点的状态
2. 选举leader
public void electLeader()
log.debug("Elect a new leader now.");
this.jobNodeStorage.executeInLeader("leader/election/latch", new LeaderService.LeaderElectionExecutionCallback());
log.debug("Leader election completed.");
这个地方主要是用了一个锁。
leaderlatch调用的过程已经进入了Curator的代码了,然后主要的目的是用curator来实现主节点的选举工作。
public void executeInLeader(String latchNode, LeaderExecutionCallback callback)
try
LeaderLatch latch = new LeaderLatch(this.getClient(), this.jobNodePath.getFullPath(latchNode));
Throwable var4 = null;
try
latch.start();
latch.await();
callback.execute();
catch (Throwable var14)
var4 = var14;
throw var14;
finally
if (latch != null)
if (var4 != null)
try
latch.close();
catch (Throwable var13)
var4.addSuppressed(var13);
else
latch.close();
catch (Exception var16)
this.handleException(var16);
3. 持久化服务器信息
public void persistOnline(boolean enabled)
if (!JobRegistry.getInstance().isShutdown(this.jobName))
this.jobNodeStorage.fillJobNode(this.serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(this.jobName).getIp()), enabled ? "" : ServerStatus.DISABLED.name());
调用了注册中心
public void fillJobNode(String node, Object value)
this.regCenter.persist(this.jobNodePath.getFullPath(node), value.toString());
调用了ZookeeperRegisterCenter,看创建的节点类型为Persist也就是说服务器的信息是持久节点
public void persist(String key, String value)
try
if (!this.isExisted(key))
((ACLBackgroundPathAndBytesable)this.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)).forPath(key, value.getBytes(Charsets.UTF_8));
else
this.update(key, value);
catch (Exception var4)
RegExceptionHandler.handleException(var4);
4. 持久化作业信息
作业的数据是临时节点
public void persistOnline()
this.jobNodeStorage.fillEphemeralJobNode(this.instanceNode.getLocalInstanceNode(), "");
5.重新分片
public void setReshardingFlag()
this.jobNodeStorage.createJobNodeIfNeeded("leader/sharding/necessary");
在JobNodeStorage中调用createJobNodeIfNeeded方法。
public void createJobNodeIfNeeded(String node)
if (this.isJobRootNodeExisted() && !this.isJobNodeExisted(node))
this.regCenter.persist(this.jobNodePath.getFullPath(node), "");
又是注册中心进行persist。
6. 初始化监听服务
public void listen()
int port = this.configService.load(true).getMonitorPort();
if (port >= 0)
try
log.info("Elastic job: Monitor service is running, the port is ''", port);
this.openSocketForMonitor(port);
catch (IOException var3)
log.error("Elastic job: Monitor service listen failure, error is: ", var3);
然后调用了openSocketForMonitor方法。
private void openSocketForMonitor(int port) throws IOException
this.serverSocket = new ServerSocket(port);
(new Thread()
public void run()
while(!MonitorService.this.closed)
try
MonitorService.this.process(MonitorService.this.serverSocket.accept());
catch (IOException var2)
MonitorService.log.error("Elastic job: Monitor service open socket for monitor failure, error is: ", var2);
).start();
这里直接调用了Socket来处理,然后这个process方法中进行了数据的处理。
这个地方从process来看的话主要还是用于监听dump相关数据用的吧。比较长,只复制一部分。
private void process(Socket socket) throws IOException
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
Throwable var3 = null;
try
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
Throwable var5 = null;
try
Socket autoCloseSocket = socket;
Throwable var7 = null;
try
String cmdLine = reader.readLine();
if (null != cmdLine && "dump".equalsIgnoreCase(cmdLine))
List<String> result = new ArrayList();
this.dumpDirectly("/" + this.jobName, result);
this.outputMessage(writer, Joiner.on("\\n").join(SensitiveInfoUtils.filterSensitiveIps(result)) + "\\n");
SchedulerJob调度
public void scheduleJob(String cron)
try
if (!this.scheduler.checkExists(this.jobDetail.getKey()))
this.scheduler.scheduleJob(this.jobDetail, this.createTrigger(cron));
this.scheduler.start();
catch (SchedulerException var3)
throw new JobSystemException(var3);
启动调度器,然后进行相关的调度就行了,这个地方应该是调用了底层封装的Quartz的任务调度程序。
补充
为了理清Quartz与ElasticJob的关系。
ElasticJob主要是自己封装了一些任务的属性啊,方法啊之类的,基本的形式在于其封装了一些自己本身必须的而且Quartz也必须的部分,那么在使用的时候这两者到底怎么进行的切换呢?难道它是用的继承关系吗?
不是继承关系,当然这里面有继承关系实现的部分类。但是我用的lite版本的elasticjob主要还不是用的继承实现的,
下面来看这个类。该类中封装的就是Quartz类中的一些属性>jobDetail和scheduler。
其实这两者直接应该说是聚合关系。
/**
* 作业调度控制器.
*
* @author zhangliang
*/
@RequiredArgsConstructor
public final class JobScheduleController
private final Scheduler scheduler;
private final JobDetail jobDetail;
private final String triggerIdentity;
实际执行的时候是调用了这个JobScheduleController的schedule方法。这里面有一个createTrigger(cron)
/**
* 调度作业.
*
* @param cron CRON表达式
*/
public void scheduleJob(final String cron)
try
if (!scheduler.checkExists(jobDetail.getKey()))
scheduler.scheduleJob(jobDetail, createTrigger(cron));
scheduler.start();
catch (final SchedulerException ex)
throw new JobSystemException(ex);
涉及到的设计模式
public final class LiteJobConfiguration implements JobRootConfiguration
private final JobTypeConfiguration typeConfig;
private final boolean monitorExecution;
private final int maxTimeDiffSeconds;
private final int monitorPort;
private final String jobShardingStrategyClass;
private final int reconcileIntervalMinutes;
private final boolean disabled;
private final boolean overwrite;
public String getJobName()
return this.typeConfig.getCoreConfig().getJobName();
public boolean isFailover()
return this.typeConfig.getCoreConfig().isFailover();
public static LiteJobConfiguration.Builder newBuilder(JobTypeConfiguration jobConfig)
return new LiteJobConfiguration.Builder(jobConfig);
public JobTypeConfiguration getTypeConfig()
return this.typeConfig;
public boolean isMonitorExecution()
return this.monitorExecution;
public int getMaxTimeDiffSeconds()
return this.maxTimeDiffSeconds;
public int getMonitorPort()
return this.monitorPort;
public String getJobShardingStrategyClass()
return this.jobShardingStrategyClass;
public int getReconcileIntervalMinutes()
return this.reconcileIntervalMinutes;
public boolean isDisabled()
return this.disabled;
public boolean isOverwrite()
return this.overwrite;
private LiteJobConfiguration(JobTypeConfiguration typeConfig, boolean monitorExecution, int maxTimeDiffSeconds, int monitorPort, String jobShardingStrategyClass, int reconcileIntervalMinutes, boolean disabled, boolean overwrite)
this.typeConfig = typeConfig;
this.monitorExecution = monitorExecution;
this.maxTimeDiffSeconds = maxTimeDiffSeconds;
this.monitorPort = monitorPort;
this.jobShardingStrategyClass = jobShardingStrategyClass;
this.reconcileIntervalMinutes = reconcileIntervalMinutes;
this.disabled = disabled;
this.overwrite = overwrite;
public static class Builder
private final JobTypeConfiguration jobConfig;
private boolean monitorExecution;
private int maxTimeDiffSeconds;
private int monitorPort;
private String jobShardingStrategyClass;
private boolean disabled;
private boolean overwrite;
private int reconcileIntervalMinutes;
public LiteJobConfiguration.Builder monitorExecution(boolean monitorExecution)
this.monitorExecution = monitorExecution;
return this;
public LiteJobConfiguration.Builder maxTimeDiffSeconds(int maxTimeDiffSeconds)
this.maxTimeDiffSeconds = maxTimeDiffSeconds;
return this;
public LiteJobConfiguration.Builder monitorPort(int monitorPort)
this.monitorPort = monitorPort;
return this;
public LiteJobConfiguration.Builder jobShardingStrategyClass(String jobShardingStrategyClass)
if (null != jobShardingStrategyClass)
this.jobShardingStrategyClass = jobShardingStrategyClass;
return this;
public LiteJobConfiguration.Builder reconcileIntervalMinutes(int reconcileIntervalMinutes)
this.reconcileIntervalMinutes = reconcileIntervalMinutes;
return this;
public LiteJobConfiguration.Builder disabled(boolean disabled)
this.disabled = disabled;
return this;
public LiteJobConfiguration.Builder overwrite(boolean overwrite)
this.overwrite = overwrite;
return this;
public final LiteJobConfiguration build()
return new LiteJobConfiguration(this.jobConfig, this.monitorExecution, this.maxTimeDiffSeconds, this.monitorPort, this.jobShardingStrategyClass, this.reconcileIntervalMinutes, this.disabled, this.overwrite);
private Builder(JobTypeConfiguration jobConfig)
this.monitorExecution = true;
this.maxTimeDiffSeconds = -1;
this.monitorPort = -1;
this.jobShardingStrategyClass = "";
this.reconcileIntervalMinutes = 10;
this.jobConfig = jobConfig;
以上是关于ElasticJob源码分析--定时任务执行JobScheduler类分析的主要内容,如果未能解决你的问题,请参考以下文章
#yyds干货盘点# springboot整合Elastic Job实现分片配置定时任务
SpringBoot-ElasticJob封装快速上手使用(分布式定时器)
SpringBoot定时任务 - 什么是ElasticJob?如何集成ElasticJob实现分布式任务调度?