分布式作业系统 Elastic-Job-Lite 源码分析 —— 自诊断修复
Posted 芋道源码
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式作业系统 Elastic-Job-Lite 源码分析 —— 自诊断修复相关的知识,希望对你有一定的参考价值。
技术文章第一时间送达!
源码精品专栏
69 篇
61 篇
摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/reconcile/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文基于 Elastic-Job V2.1.5 版本分享
1. 概述
2. ReconcileService
1. 概述
本文主要分享 Elastic-Job-Lite 自诊断修复。
在分布式的场景下由于网络、时钟等原因,可能导致 Zookeeper 的数据与真实运行的作业产生不一致,这种不一致通过正向的校验无法完全避免。需要另外启动一个线程定时校验注册中心数据与真实作业状态的一致性,即维持 Elastic-Job 的最终一致性。
涉及到主要类的类图如下( 打开大图 ):
在 Elastic-Job-lite 里,调解分布式作业不一致状态服务( ReconcileService ) 实现了自诊断修复功能。
你行好事会因为得到赞赏而愉悦
同理,开源项目贡献者会因为 Star 而更加有动力
为 Elastic-Job 点赞!传送门
2. ReconcileService
ReconcileService,调解分布式作业不一致状态服务。
ReconcileService 继承 Google Guava AbstractScheduledService 抽象类,实现 #scheduler()
、#runOneIteration()
方法,达到周期性校验注册中心数据与真实作业状态的一致性。
#scheduler()
方法实现如下:
// ReconcileService.java
@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MINUTES);
}
每 1 分钟会调用一次
#runOneIteration()
方法进行校验。Google Guava AbstractScheduledService 相关的知识,有兴趣的同学可以自己 Google 学习哟。
#runOneIteration()
方法实现如下:
// ReconcileService.java
@Override
protected void runOneIteration() throws Exception {
LiteJobConfiguration config = configService.load(true);
int reconcileIntervalMinutes = null == config ? -1 : config.getReconcileIntervalMinutes();
if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) { // 校验是否达到校验周期
// 设置最后校验时间
lastReconcileTime = System.currentTimeMillis();
if (leaderService.isLeaderUntilBlock() // 主作业节点才可以执行
&& !shardingService.isNeedSharding() // 当前作业不需要重新分片
&& shardingService.hasShardingInfoInOfflineServers()) { // 查询是包含有分片节点的不在线服务器
log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
// 设置需要重新分片的标记
shardingService.setReshardingFlag();
}
}
}
通过作业配置,设置修复作业服务器不一致状态服务调度间隔时间属性(
LiteJobConfiguration.reconcileIntervalMinutes
)。调用
ShardingService#setReshardingFlag()
方法,设置需要重新分片的标记。这个也是 ReconcileService 最本质的行为,有了这个标记后,作业会重新进行分片,达到作业节点本地分片数据与 Zookeeper 数据一致。作业分片逻辑,在《Elastic-Job-Lite 源码分析 —— 作业分片》有详细解析。调解分布式作业不一致状态服务一共有三个条件:
调用
LeaderService#isLeaderUntilBlock()
方法,判断当前作业节点是否为主节点。在《Elastic-Job-Lite 源码分析 —— 主节点选举》有详细解析。调用
ShardingService#isNeedSharding()
方法,判断当前作业是否需要重分片。如果需要重新分片,就不要重复设置当前作业需要重新分片的标记。调用
ShardingService#hasShardingInfoInOfflineServers()
方法,查询是否包含有分片节点的不在线服务器。永久数据节点/${JOB_NAME}/sharding/${ITEM_INDEX}/instance
存储分配的作业节点主键(${JOB_INSTANCE_ID}
), 不会随着作业节点因为各种原因断开后会话超时移除,而临时数据节点/${JOB_NAME}/instances/${JOB_INSTANCE_ID}
会随着作业节点因为各种原因断开后超时会话超时移除。当查询到包含有分片节点的不在线的作业节点,设置需要重新分片的标记后进行重新分片,将其持有的作业分片分配给其它在线的作业节点。// ShardingService.java
/**
* 查询是包含有分片节点的不在线服务器.
*
* @return 是包含有分片节点的不在线服务器
*/
public boolean hasShardingInfoInOfflineServers() {
List<String> onlineInstances = jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT); // "/¨E 123 E J O B ¨E 95 E N A M E ¨E 125 E /i n s t a n c e s /<annotation encoding="application style="color: rgb(128, 128, 128);overflow-wrap: inherit !important;word-break: inherit !important;" span="" class="hljs-comment" encoding=""application"><span class="katex-html" aria-hidden="true" style="overflow-wrap: inherit !important;word-break: inherit !important;"><span class="strut" style="height:1em;vertical-align:-0.25em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E123<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E<span class="mord mathit" style="margin-right:0.09618em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">J<span class="mord mathit" style="margin-right:0.02778em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">O<span class="mord mathit" style="margin-right:0.05017em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">B¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E95<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E<span class="mord mathit" style="margin-right:0.10903em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">N<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">A<span class="mord mathit" style="margin-right:0.10903em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">M<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E125<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E/<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">i<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">n<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">s<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">t<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">a<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">n<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">c<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">e<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">s/{JOB_INSTANCE_ID}"
int shardingTotalCount = configService.load(true).getTypeConfig().getCoreConfig().getShardingTotalCount();
for (int i = 0; i < shardingTotalCount; i++) {
if (!onlineInstances.contains(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) { // "/¨E 123 E J O B ¨E 95 E N A M E ¨E 125 E /s h a r d i n g /<annotation encoding="application style="color: rgb(128, 128, 128);overflow-wrap: inherit !important;word-break: inherit !important;" span="" class="hljs-comment" encoding=""application"><span class="katex-html" aria-hidden="true" style="overflow-wrap: inherit !important;word-break: inherit !important;"><span class="strut" style="height:1em;vertical-align:-0.25em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E123<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E<span class="mord mathit" style="margin-right:0.09618em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">J<span class="mord mathit" style="margin-right:0.02778em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">O<span class="mord mathit" style="margin-right:0.05017em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">B¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E95<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E<span class="mord mathit" style="margin-right:0.10903em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">N<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">A<span class="mord mathit" style="margin-right:0.10903em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">M<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E125<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E/<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">s<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">h<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">a<span class="mord mathit" style="margin-right:0.02778em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">r<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">d<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">i<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">n<span class="mord mathit" style="margin-right:0.03588em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">g/{ITEM_INDEX}/instance"
return true;
}
}
return false;
}</span class="mord mathit" style="margin-right:0.03588em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.02778em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.10903em;"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.10903em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05017em;"></span class="mord mathit" style="margin-right:0.02778em;"></span class="mord mathit" style="margin-right:0.09618em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="strut" style="height:1em;vertical-align:-0.25em;"></span class="katex-html" aria-hidden="true"></annotation encoding="application></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.10903em;"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.10903em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05017em;"></span class="mord mathit" style="margin-right:0.02778em;"></span class="mord mathit" style="margin-right:0.09618em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="strut" style="height:1em;vertical-align:-0.25em;"></span class="katex-html" aria-hidden="true"></annotation encoding="application>
已在知识星球更新源码解析如下:
《精尽 Dubbo 源码解析系列》69 篇。
《精尽 Netty 源码解析系列》61 篇。
《精尽 Spring 源码解析系列》35 篇。
《精尽 MyBatis 源码解析系列》34 篇。
《数据库实体设计》17 篇。
《精尽 Spring MVC 源码解析系列》15 篇。
目前在知识星球更新了《Dubbo 源码解析》目录如下:
01. 调试环境搭建
02. 项目结构一览
03. 配置 Configuration
04. 核心流程一览
05. 拓展机制 SPI
06. 线程池
07. 服务暴露 Export
08. 服务引用 Refer
09. 注册中心 Registry
10. 动态编译 Compile
11. 动态代理 Proxy
12. 服务调用 Invoke
13. 调用特性
14. 过滤器 Filter
15. NIO 服务器
16. P2P 服务器
17. HTTP 服务器
18. 序列化 Serialization
19. 集群容错 Cluster
20. 优雅停机
21. 日志适配
22. 状态检查
23. 监控中心 Monitor
24. 管理中心 Admin
25. 运维命令 QOS
26. 链路追踪 Tracing
... 一共 69+ 篇
目前在知识星球更新了《Netty 源码解析》目录如下:
01. 调试环境搭建
02. NIO 基础
03. Netty 简介
04. 启动 Bootstrap
05. 事件轮询 EventLoop
06. 通道管道 ChannelPipeline
07. 通道 Channel
08. 字节缓冲区 ByteBuf
09. 通道处理器 ChannelHandler
10. 编解码 Codec
11. 工具类 Util
... 一共 61+ 篇
目前在知识星球更新了《数据库实体设计》目录如下:
01. 商品模块
02. 交易模块
03. 营销模块
04. 公用模块
... 一共 17+ 篇
目前在知识星球更新了《Spring 源码解析》目录如下:
01. 调试环境搭建
02. IoC Resource 定位
03. IoC BeanDefinition 载入
04. IoC BeanDefinition 注册
05. IoC Bean 获取
06. IoC Bean 生命周期
... 一共 35+ 篇
目前在知识星球更新了《MyBatis 源码解析》目录如下:
01. 调试环境搭建
02. 项目结构一览
03. MyBatis 面试题合集
04. MyBatis 学习资料合集
05. MyBatis 初始化
06. SQL 初始化
07. SQL 执行
08. 插件体系
09. Spring 集成
... 一共 34+ 篇
源码不易↓↓↓↓↓
点赞支持老艿艿↓↓
以上是关于分布式作业系统 Elastic-Job-Lite 源码分析 —— 自诊断修复的主要内容,如果未能解决你的问题,请参考以下文章