ElasticJob失效转移逻辑

Posted 低调的洋仔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticJob失效转移逻辑相关的知识,希望对你有一定的参考价值。

本文主要讲解ElasticJob的失效转移逻辑

从根本上说,失效转移逻辑是基于Zookeeper的节点的变化而实现的,这里大致说一下失效转移的流程,Scheduler启动开始后,会注册相应的监听器,这些监听器会监听数据节点的变化情况,那么失效转移逻辑的触发就是因为监听了instanceNode的节点的删除事件,这里其实就是利用了ZK的watch机制,回调的时候通过类型判断的是不是删除事件,当删除事件被触发后,就会执行内部的失效转移的逻辑,当然前提是通过一些条件的验证,失效转移逻辑主要会先获取到所有失效的节点的名称,也就是item的那个分片值,这一步会返回一个失效节点的集合,然后对这个集合进行遍历,针对每一个失效的节点触发其失效转移的真正的执行逻辑,首先会创建一个失效转移的标记,也就是在leader的items中创建分片值对应的节点,然后调用FailoverService中的fialoverIfNecessary方法,具体是通过竞争一个failover/latch节点来实现的。然后会调用该方法内部的回调类中的方法,该方法会直接从尚活着的节点中进行选取,确定完成失效转移的节点后会在sharding的路径下生成一个对应failover、instance、misfire等节点,这些完成以后会通过删除leader节点下items中对应的分片的节点完成迁移工作。

 

 

 

class JobCrashedJobListener extends AbstractJobListener 
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) 
            if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) 
                String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
                if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) 
                    return;
                
                List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
                if (!failoverItems.isEmpty()) 
                    for (int each : failoverItems) 
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    
                 else 
                    for (int each : shardingService.getShardingItems(jobInstanceId)) 
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    
                
            
        
    

FailoverService类中的方法

    /**
     * 如果需要失效转移, 则执行作业失效转移.
     */
    public void failoverIfNecessary() 
        if (needFailover()) 
            jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
        
    
    

 

 

 

 

 class FailoverLeaderExecutionCallback implements LeaderExecutionCallback 
        
        @Override
        public void execute() 
            if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) 
                return;
            
            int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
            log.debug("Failover job '' begin, crashed item ''", jobName, crashedItem);
            jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
            jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
            // TODO 不应使用triggerJob, 而是使用executor统一调度
            JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
            if (null != jobScheduleController) 
                jobScheduleController.triggerJob();
            
        
    

 

 

 

 

 

以上是关于ElasticJob失效转移逻辑的主要内容,如果未能解决你的问题,请参考以下文章

Elastic Job 入门

elastic-job详解:失效转移

SpringBoot定时任务 - 什么是ElasticJob?如何集成ElasticJob实现分布式任务调度?

ElasticJob和SpringBoot

GitHub上持续冲榜,ElasticJob重启

基于Mesos的当当作业云Elastic Job Cloud