ElasticJob失效转移逻辑
Posted 低调的洋仔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticJob失效转移逻辑相关的知识,希望对你有一定的参考价值。
本文主要讲解ElasticJob的失效转移逻辑
从根本上说,失效转移逻辑是基于Zookeeper的节点的变化而实现的,这里大致说一下失效转移的流程,Scheduler启动开始后,会注册相应的监听器,这些监听器会监听数据节点的变化情况,那么失效转移逻辑的触发就是因为监听了instanceNode的节点的删除事件,这里其实就是利用了ZK的watch机制,回调的时候通过类型判断的是不是删除事件,当删除事件被触发后,就会执行内部的失效转移的逻辑,当然前提是通过一些条件的验证,失效转移逻辑主要会先获取到所有失效的节点的名称,也就是item的那个分片值,这一步会返回一个失效节点的集合,然后对这个集合进行遍历,针对每一个失效的节点触发其失效转移的真正的执行逻辑,首先会创建一个失效转移的标记,也就是在leader的items中创建分片值对应的节点,然后调用FailoverService中的fialoverIfNecessary方法,具体是通过竞争一个failover/latch节点来实现的。然后会调用该方法内部的回调类中的方法,该方法会直接从尚活着的节点中进行选取,确定完成失效转移的节点后会在sharding的路径下生成一个对应failover、instance、misfire等节点,这些完成以后会通过删除leader节点下items中对应的分片的节点完成迁移工作。
class JobCrashedJobListener extends AbstractJobListener
@Override
protected void dataChanged(final String path, final Type eventType, final String data)
if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path))
String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId()))
return;
List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
if (!failoverItems.isEmpty())
for (int each : failoverItems)
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
else
for (int each : shardingService.getShardingItems(jobInstanceId))
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
FailoverService类中的方法
/**
* 如果需要失效转移, 则执行作业失效转移.
*/
public void failoverIfNecessary()
if (needFailover())
jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
class FailoverLeaderExecutionCallback implements LeaderExecutionCallback
@Override
public void execute()
if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover())
return;
int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
log.debug("Failover job '' begin, crashed item ''", jobName, crashedItem);
jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
// TODO 不应使用triggerJob, 而是使用executor统一调度
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (null != jobScheduleController)
jobScheduleController.triggerJob();
以上是关于ElasticJob失效转移逻辑的主要内容,如果未能解决你的问题,请参考以下文章