ElasticJob源码分析--定时任务执行JobScheduler类分析

Posted 低调的洋仔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticJob源码分析--定时任务执行JobScheduler类分析相关的知识,希望对你有一定的参考价值。

 

public static void main(String[] args) 
        // 初始化数据源
        DataSource dataSource = MovieServiceUtils.getDataSource();
        // 定义日志数据库事件溯源配置
        JobEventConfiguration jobEventRdbConfig = new JobEventRdbConfiguration(dataSource);
        new JobScheduler(createRegistryCenter(), createJobConfiguration(), jobEventRdbConfig).init();
    


JobScheduler执行过程

 

 

    public JobScheduler(CoordinatorRegistryCenter regCenter, LiteJobConfiguration liteJobConfig, JobEventConfiguration jobEventConfig, ElasticJobListener... elasticJobListeners) 
        this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
    

    private JobScheduler(CoordinatorRegistryCenter regCenter, LiteJobConfiguration liteJobConfig, JobEventBus jobEventBus, ElasticJobListener... elasticJobListeners) 
        JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
        this.liteJobConfig = liteJobConfig;
        this.regCenter = regCenter;
        List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
        this.setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
        this.schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
        this.jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
    


这里创建的是JobScheduler这个对象,然后下面调用了init方法。

 

 

    public void init() 
        LiteJobConfiguration liteJobConfigFromRegCenter = this.schedulerFacade.updateJobConfiguration(this.liteJobConfig);
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
        JobScheduleController jobScheduleController = new JobScheduleController(this.createScheduler(), this.createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
        JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, this.regCenter);
        this.schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
        jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    

 

初始化的几个动作:

--初始化设置分片总量,这些参数都是通过配置中心中获取到的,也就是LiteJobConfiguration这个类。

--创建调度器,createScheduler方法。

--创建任务详情,createJobdDetail方法来创建。

--注册任务到ZK上面。

--注册作业启动信息registerStartUpInfo方法。

--进行作业调度scheduleJob方法。

 

createScheduler方法

这里创建的就是Quartz里面的scheduler。

然后这个方法里面还添加了任务触发监听器。

 

    private Scheduler createScheduler() 
        try 
            StdSchedulerFactory factory = new StdSchedulerFactory();
            factory.initialize(this.getBaseQuartzProperties());
            Scheduler result = factory.getScheduler();
            result.getListenerManager().addTriggerListener(this.schedulerFacade.newJobTriggerListener());
            return result;
         catch (SchedulerException var3) 
            throw new JobSystemException(var3);
        
    


factory这个地方调用了getScheduler这个方法,然后这个方法其实调用了的是Quartz中的方法。

 

然后上面的initialize方法是调用了一个getBaseQuartzProperties方法,在这个方法里面配置了相关的参数。

 

private Properties getBaseQuartzProperties() 
        Properties result = new Properties();
        result.put("org.quartz.threadPool.class", SimpleThreadPool.class.getName());
        result.put("org.quartz.threadPool.threadCount", "1");
        result.put("org.quartz.scheduler.instanceName", this.liteJobConfig.getJobName());
        result.put("org.quartz.jobStore.misfireThreshold", "1");
        result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
        result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
        return result;
    


getSchedule方法调用的Quartz中的Scheduler中的方法。

 

Scheduler在使用之前需要进行实例化,从SchedulerFactory创建它开始,到scheduler调用shutdown结束。

他在实例化之后,可以启动(start)、暂停(standby)、停止(shutdown),然后只有start的scheduler才能被触发(trigger).

 

public Scheduler getScheduler() throws SchedulerException 
        if (this.cfg == null) 
            this.initialize();
        

        SchedulerRepository schedRep = SchedulerRepository.getInstance();
        Scheduler sched = schedRep.lookup(this.getSchedulerName());
        if (sched != null) 
            if (!sched.isShutdown()) 
                return sched;
            

            schedRep.remove(this.getSchedulerName());
        

        sched = this.instantiate();
        return sched;
    

 

 

CreateJobDetail方法

 

 private JobDetail createJobDetail(String jobClass) 
        JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(this.liteJobConfig.getJobName()).build();
        result.getJobDataMap().put("jobFacade", this.jobFacade);
        Optional<ElasticJob> elasticJobInstance = this.createElasticJobInstance();
        if (elasticJobInstance.isPresent()) 
            result.getJobDataMap().put("elasticJob", elasticJobInstance.get());
         else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) 
            try 
                result.getJobDataMap().put("elasticJob", Class.forName(jobClass).newInstance());
             catch (ReflectiveOperationException var5) 
                throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", new Object[]jobClass);
            
        

        return result;
    

这个地方首先是创建了一个result实例,这个实例用于封装Job的详细信息的。

 

前面几个方法的连续调用基本上就是添加参数而已。然后这里是直接build,创建一个JobDetail的实例。

后面调用了一个createElasticJobInstance()方法,这个方法的话是创建一个实例,然后这个实例放在了Gava的Optional里面。

接下来调用这个isPresent方法来判断下是不是实例已经创建了,如果创建了的话,那么就直接将其封装进result的DataMap中去。

直接使用的是elasticjob的key.

 

 

public JobDetail build() 
        JobDetailImpl job = new JobDetailImpl();
        job.setJobClass(this.jobClass);
        job.setDescription(this.description);
        if (this.key == null) 
            this.key = new JobKey(Key.createUniqueName((String)null), (String)null);
        

        job.setKey(this.key);
        job.setDurability(this.durability);
        job.setRequestsRecovery(this.shouldRecover);
        if (!this.jobDataMap.isEmpty()) 
            job.setJobDataMap(this.jobDataMap);
        

        return job;
    

创建了一个JobDetailImpl这个类的实例,这个实例封装了许多参数信息。而这个JobDetailImpl的类就是Quartz中的一个实现类。

 

 

public class JobDetailImpl implements Cloneable, Serializable, JobDetail 
    private static final long serialVersionUID = -6069784757781506897L;
    private String name;
    private String group;
    private String description;
    private Class<? extends Job> jobClass;
    private JobDataMap jobDataMap;
    private boolean durability;
    private boolean shouldRecover;
    private transient JobKey key;


这里面是一个jobDataMap的一个类,这个类实际上是一个实现了Map接口的类一路继承下来的。应该是ElasticJob自己进行了一个封装而已。

 

 

然后创建了一个

 

    public JobScheduleController(Scheduler scheduler, JobDetail jobDetail, String triggerIdentity) 
        this.scheduler = scheduler;
        this.jobDetail = jobDetail;
        this.triggerIdentity = triggerIdentity;
    

 

 

RegisterJob

 

 

 

 

JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, this.regCenter);


JobRegistry

 

 

public final class JobRegistry 
    private static volatile JobRegistry instance;
    private Map<String, JobScheduleController> schedulerMap = new ConcurrentHashMap();
    private Map<String, CoordinatorRegistryCenter> regCenterMap = new ConcurrentHashMap();
    private Map<String, JobInstance> jobInstanceMap = new ConcurrentHashMap();
    private Map<String, Boolean> jobRunningMap = new ConcurrentHashMap();
    private Map<String, Integer> currentShardingTotalCountMap = new ConcurrentHashMap();

    public static JobRegistry getInstance() 
        if (null == instance) 
            Class var0 = JobRegistry.class;
            synchronized(JobRegistry.class) 
                if (null == instance) 
                    instance = new JobRegistry();
                
            
        

        return instance;
    

单例模式,采用了双重检查的方式创建了对象。

 

 

public void registerJob(String jobName, JobScheduleController jobScheduleController, CoordinatorRegistryCenter regCenter) 
        this.schedulerMap.put(jobName, jobScheduleController);
        this.regCenterMap.put(jobName, regCenter);
        regCenter.addCacheData("/" + jobName);
    

注册的过程。

 

schedulerMap和regCenterMap,就是把数据封装到了Map中去。也就是将需要的数据交给第三方暂存一下,用的时候取就是了。这部分应该是本地做了一个缓存。

下面的regCenter是将数据注册到远端的注册中心上去。

 

RegisterStartUpInfo

 

    public void registerStartUpInfo(boolean enabled) 
        this.listenerManager.startAllListeners();//启动所有的监听器
        this.leaderService.electLeader();// 选举leader
        this.serverService.persistOnline(enabled);// 持久化作业信息到zk,先持久化服务器信息,然后再持久化作业信息。
        this.instanceService.persistOnline();// 持久化作业信息
        this.shardingService.setReshardingFlag();// 重新分片
        this.monitorService.listen();// 初始化监听服务
        if (!this.reconcileService.isRunning()) 
            this.reconcileService.startAsync();// 调节分布式作业不一致状态服务异步启动。
        

    


1.启动所有的监听器

 

 

 public void startAllListeners() 
        this.electionListenerManager.start();// 选举
        this.shardingListenerManager.start();// 分片
        this.failoverListenerManager.start();// 失效转移
        this.monitorExecutionListenerManager.start();// 执行监控监听
        this.shutdownListenerManager.start();// shutdown监听
        this.triggerListenerManager.start();// 触发器监听
        this.rescheduleListenerManager.start();// 重新调度
        this.guaranteeListenerManager.start(); // 保证分布式任务全部开始和结束状态
        this.jobNodeStorage.addConnectionStateListener(this.regCenterConnectionStateListener);// 注册中心与任务节点的状态
    


2. 选举leader

 

 

    public void electLeader() 
        log.debug("Elect a new leader now.");
        this.jobNodeStorage.executeInLeader("leader/election/latch", new LeaderService.LeaderElectionExecutionCallback());
        log.debug("Leader election completed.");
    

这个地方主要是用了一个锁。

 

leaderlatch调用的过程已经进入了Curator的代码了,然后主要的目的是用curator来实现主节点的选举工作。

    public void executeInLeader(String latchNode, LeaderExecutionCallback callback) 
        try 
            LeaderLatch latch = new LeaderLatch(this.getClient(), this.jobNodePath.getFullPath(latchNode));
            Throwable var4 = null;

            try 
                latch.start();
                latch.await();
                callback.execute();
             catch (Throwable var14) 
                var4 = var14;
                throw var14;
             finally 
                if (latch != null) 
                    if (var4 != null) 
                        try 
                            latch.close();
                         catch (Throwable var13) 
                            var4.addSuppressed(var13);
                        
                     else 
                        latch.close();
                    
                

            
         catch (Exception var16) 
            this.handleException(var16);
        

    


3. 持久化服务器信息

 

 

    public void persistOnline(boolean enabled) 
        if (!JobRegistry.getInstance().isShutdown(this.jobName)) 
            this.jobNodeStorage.fillJobNode(this.serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(this.jobName).getIp()), enabled ? "" : ServerStatus.DISABLED.name());
        

    

调用了注册中心

 

 

public void fillJobNode(String node, Object value) 
        this.regCenter.persist(this.jobNodePath.getFullPath(node), value.toString());
    

调用了ZookeeperRegisterCenter,看创建的节点类型为Persist也就是说服务器的信息是持久节点

 

 

    public void persist(String key, String value) 
        try 
            if (!this.isExisted(key)) 
                ((ACLBackgroundPathAndBytesable)this.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)).forPath(key, value.getBytes(Charsets.UTF_8));
             else 
                this.update(key, value);
            
         catch (Exception var4) 
            RegExceptionHandler.handleException(var4);
        

    


4. 持久化作业信息

 

作业的数据是临时节点

 

public void persistOnline() 
        this.jobNodeStorage.fillEphemeralJobNode(this.instanceNode.getLocalInstanceNode(), "");
    

 


5.重新分片

 

 

 

    public void setReshardingFlag() 
        this.jobNodeStorage.createJobNodeIfNeeded("leader/sharding/necessary");
    

在JobNodeStorage中调用createJobNodeIfNeeded方法。

 

 

  public void createJobNodeIfNeeded(String node) 
        if (this.isJobRootNodeExisted() && !this.isJobNodeExisted(node)) 
            this.regCenter.persist(this.jobNodePath.getFullPath(node), "");
        

    

又是注册中心进行persist。

 


6. 初始化监听服务

 

public void listen() 
        int port = this.configService.load(true).getMonitorPort();
        if (port >= 0) 
            try 
                log.info("Elastic job: Monitor service is running, the port is ''", port);
                this.openSocketForMonitor(port);
             catch (IOException var3) 
                log.error("Elastic job: Monitor service listen failure, error is: ", var3);
            

        
    

然后调用了openSocketForMonitor方法。

 

 

private void openSocketForMonitor(int port) throws IOException 
        this.serverSocket = new ServerSocket(port);
        (new Thread() 
            public void run() 
                while(!MonitorService.this.closed) 
                    try 
                        MonitorService.this.process(MonitorService.this.serverSocket.accept());
                     catch (IOException var2) 
                        MonitorService.log.error("Elastic job: Monitor service open socket for monitor failure, error is: ", var2);
                    
                

            
        ).start();
    

这里直接调用了Socket来处理,然后这个process方法中进行了数据的处理。

 

这个地方从process来看的话主要还是用于监听dump相关数据用的吧。比较长,只复制一部分。

 

 private void process(Socket socket) throws IOException 
        BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        Throwable var3 = null;

        try 
            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            Throwable var5 = null;

            try 
                Socket autoCloseSocket = socket;
                Throwable var7 = null;

                try 
                    String cmdLine = reader.readLine();
                    if (null != cmdLine && "dump".equalsIgnoreCase(cmdLine)) 
                        List<String> result = new ArrayList();
                        this.dumpDirectly("/" + this.jobName, result);
                        this.outputMessage(writer, Joiner.on("\\n").join(SensitiveInfoUtils.filterSensitiveIps(result)) + "\\n");
                    

 

 

SchedulerJob调度

 

 

 

 

    public void scheduleJob(String cron) 
        try 
            if (!this.scheduler.checkExists(this.jobDetail.getKey())) 
                this.scheduler.scheduleJob(this.jobDetail, this.createTrigger(cron));
            

            this.scheduler.start();
         catch (SchedulerException var3) 
            throw new JobSystemException(var3);
        
    

启动调度器,然后进行相关的调度就行了,这个地方应该是调用了底层封装的Quartz的任务调度程序。


 

补充

为了理清Quartz与ElasticJob的关系。

ElasticJob主要是自己封装了一些任务的属性啊,方法啊之类的,基本的形式在于其封装了一些自己本身必须的而且Quartz也必须的部分,那么在使用的时候这两者到底怎么进行的切换呢?难道它是用的继承关系吗?

不是继承关系,当然这里面有继承关系实现的部分类。但是我用的lite版本的elasticjob主要还不是用的继承实现的,

下面来看这个类。该类中封装的就是Quartz类中的一些属性>jobDetail和scheduler。

其实这两者直接应该说是聚合关系。

 

/**
 * 作业调度控制器.
 * 
 * @author zhangliang
 */
@RequiredArgsConstructor
public final class JobScheduleController 
    
    private final Scheduler scheduler;
    
    private final JobDetail jobDetail;
    
    private final String triggerIdentity;
    


实际执行的时候是调用了这个JobScheduleController的schedule方法。这里面有一个createTrigger(cron)

 

 

   /**
     * 调度作业.
     * 
     * @param cron CRON表达式
     */
    public void scheduleJob(final String cron) 
        try 
            if (!scheduler.checkExists(jobDetail.getKey())) 
                scheduler.scheduleJob(jobDetail, createTrigger(cron));
            
            scheduler.start();
         catch (final SchedulerException ex) 
            throw new JobSystemException(ex);
        
    

 

 

 

 

 

 

涉及到的设计模式

 

public final class LiteJobConfiguration implements JobRootConfiguration 
    private final JobTypeConfiguration typeConfig;
    private final boolean monitorExecution;
    private final int maxTimeDiffSeconds;
    private final int monitorPort;
    private final String jobShardingStrategyClass;
    private final int reconcileIntervalMinutes;
    private final boolean disabled;
    private final boolean overwrite;

    public String getJobName() 
        return this.typeConfig.getCoreConfig().getJobName();
    

    public boolean isFailover() 
        return this.typeConfig.getCoreConfig().isFailover();
    

    public static LiteJobConfiguration.Builder newBuilder(JobTypeConfiguration jobConfig) 
        return new LiteJobConfiguration.Builder(jobConfig);
    

    public JobTypeConfiguration getTypeConfig() 
        return this.typeConfig;
    

    public boolean isMonitorExecution() 
        return this.monitorExecution;
    

    public int getMaxTimeDiffSeconds() 
        return this.maxTimeDiffSeconds;
    

    public int getMonitorPort() 
        return this.monitorPort;
    

    public String getJobShardingStrategyClass() 
        return this.jobShardingStrategyClass;
    

    public int getReconcileIntervalMinutes() 
        return this.reconcileIntervalMinutes;
    

    public boolean isDisabled() 
        return this.disabled;
    

    public boolean isOverwrite() 
        return this.overwrite;
    

    private LiteJobConfiguration(JobTypeConfiguration typeConfig, boolean monitorExecution, int maxTimeDiffSeconds, int monitorPort, String jobShardingStrategyClass, int reconcileIntervalMinutes, boolean disabled, boolean overwrite) 
        this.typeConfig = typeConfig;
        this.monitorExecution = monitorExecution;
        this.maxTimeDiffSeconds = maxTimeDiffSeconds;
        this.monitorPort = monitorPort;
        this.jobShardingStrategyClass = jobShardingStrategyClass;
        this.reconcileIntervalMinutes = reconcileIntervalMinutes;
        this.disabled = disabled;
        this.overwrite = overwrite;
    

    public static class Builder 
        private final JobTypeConfiguration jobConfig;
        private boolean monitorExecution;
        private int maxTimeDiffSeconds;
        private int monitorPort;
        private String jobShardingStrategyClass;
        private boolean disabled;
        private boolean overwrite;
        private int reconcileIntervalMinutes;

        public LiteJobConfiguration.Builder monitorExecution(boolean monitorExecution) 
            this.monitorExecution = monitorExecution;
            return this;
        

        public LiteJobConfiguration.Builder maxTimeDiffSeconds(int maxTimeDiffSeconds) 
            this.maxTimeDiffSeconds = maxTimeDiffSeconds;
            return this;
        

        public LiteJobConfiguration.Builder monitorPort(int monitorPort) 
            this.monitorPort = monitorPort;
            return this;
        

        public LiteJobConfiguration.Builder jobShardingStrategyClass(String jobShardingStrategyClass) 
            if (null != jobShardingStrategyClass) 
                this.jobShardingStrategyClass = jobShardingStrategyClass;
            

            return this;
        

        public LiteJobConfiguration.Builder reconcileIntervalMinutes(int reconcileIntervalMinutes) 
            this.reconcileIntervalMinutes = reconcileIntervalMinutes;
            return this;
        

        public LiteJobConfiguration.Builder disabled(boolean disabled) 
            this.disabled = disabled;
            return this;
        

        public LiteJobConfiguration.Builder overwrite(boolean overwrite) 
            this.overwrite = overwrite;
            return this;
        

        public final LiteJobConfiguration build() 
            return new LiteJobConfiguration(this.jobConfig, this.monitorExecution, this.maxTimeDiffSeconds, this.monitorPort, this.jobShardingStrategyClass, this.reconcileIntervalMinutes, this.disabled, this.overwrite);
        

        private Builder(JobTypeConfiguration jobConfig) 
            this.monitorExecution = true;
            this.maxTimeDiffSeconds = -1;
            this.monitorPort = -1;
            this.jobShardingStrategyClass = "";
            this.reconcileIntervalMinutes = 10;
            this.jobConfig = jobConfig;
        
    

 

 

 

 

 

 

 

以上是关于ElasticJob源码分析--定时任务执行JobScheduler类分析的主要内容,如果未能解决你的问题,请参考以下文章

#yyds干货盘点# springboot整合Elastic Job实现分片配置定时任务

SpringBoot-ElasticJob封装快速上手使用(分布式定时器)

SpringBoot定时任务 - 什么是ElasticJob?如何集成ElasticJob实现分布式任务调度?

Elastic Job 入门

SpringBoot2 整合ElasticJob框架,定制化管理流程

elastic job 问题汇总