k8s-pod之Job
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了k8s-pod之Job相关的知识,希望对你有一定的参考价值。
参考技术A 一次性执行任务,类似Linux中的job应用场景:如离线数据处理,视频解码等业务
docker pull perl:slim
.spec.backoffLimit用于设置Job的容错次数,默认值为6。当Job运行的Pod失败次数到 达.spec.backoffLimit次时,Job Controller不再新建Pod,直接停止运行这个Job,将其运行结 果标记为Failure。另外,Pod运行失败后再次运行的时间间隔呈递增状态,例如10s,20s, 40s。。。
运行job kubectl apply -f jobdemo.yml 查看pod日志 kubectl logs -f pi-7nrtv 删除job kubectl delete -f jobdemo.yml
在kubernetes系统中,Pod的管理对象RC,Deployment,DaemonSet和Job都面向无状态的服务,但现实中有很多服务时有状态的,比如一些集群服务,例如mysql集群,集群一般都会有这四个特点:
xxl-job之负载均衡算法
代码全集
源码回顾
调度中心触发任务之后,他的调用链如下
RemoteHttpJobBean> executeInternal > XxlJobTrigger > trigger ,
通过之前的分析xxl-job 源码解读 (二) , 我们可以了解到,xxl-job他的路由策略主要发生在trigger这个方法中
1public static void trigger(int jobId) {
2
3 // 通过JobId从数据库中查询该任务的具体信息
4 XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); // job info
5 if (jobInfo == null) {
6 logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
7 return;
8 }
9 // 获取该类型的执行器信息
10 XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); // group info
11
12 // 匹配运行模式
13 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
14 // 匹配失败后的处理模式
15 ExecutorFailStrategyEnum failStrategy = ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(), ExecutorFailStrategyEnum.FAIL_ALARM); // fail strategy
16 // 获取路由策略
17 ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
18 // 获取该执行器的集群机器列表
19 ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();
20
21 // 判断路由策略 是否为 分片广播模式
22 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum && CollectionUtils.isNotEmpty(addressList)) {
23 for (int i = 0; i < addressList.size(); i++) {
24 String address = addressList.get(i);
25 //定义日志信息
26 XxlJobLog jobLog = new XxlJobLog();
27 // .....省略
28 ReturnT<String> triggerResult = new ReturnT<String>(null);
29
30 if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
31 // 4.1、trigger-param
32 TriggerParam triggerParam = new TriggerParam();
33 triggerParam.setJobId(jobInfo.getId());
34 triggerParam.setBroadcastIndex(i); // 设置分片标记
35 triggerParam.setBroadcastIndex(addressList.size());// 设置分片总数
36 // ......省略组装参数的过程
37
38 // 根据参数以及 机器地址,向执行器发送执行信息 , 此处将会详细讲解runExecutor 这个方法
39 triggerResult = runExecutor(triggerParam, address);
40 }
41 // 将日志ID,放入队列,便于日志监控线程来监控任务的执行状态
42 JobFailMonitorHelper.monitor(jobLog.getId());
43 logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
44
45 }
46 } else {
47 // 出分片模式外,其他的路由策略均走这里
48 //定义日志信息
49 XxlJobLog jobLog = new XxlJobLog();
50 jobLog.setJobGroup(jobInfo.getJobGroup());
51 // .....省略
52 ReturnT<String> triggerResult = new ReturnT<String>(null);
53 if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
54 // 4.1、trigger-param
55 TriggerParam triggerParam = new TriggerParam();
56 triggerParam.setJobId(jobInfo.getId());
57 triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
58 triggerParam.setBroadcastIndex(0); // 默认分片标记为0
59 triggerParam.setBroadcastTotal(1); // 默认分片总数为1
60 // .... 省略组装参数的过程
61 // 此处使用了策略模式, 根据不同的策略 使用不同的实现类,下面将会详细讲解
62 triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
63 }
64 JobFailMonitorHelper.monitor(jobLog.getId());
65 logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
66 }
67}
上面的代码主要讲了分片广播这个策略的实现以及xxl-job的其他路由策略的调用位置在哪里。
ExecutorRouteStrategyEnum枚举类
这个是xxl-job路由策略非常重要的一个类, 该类通过枚举的方式,把路由key, 和策略实现类进行了一个聚合、
1//ExecutorRouteStrategyEnum.java
2public enum ExecutorRouteStrategyEnum {
3
4 FIRST("第一个", new ExecutorRouteFirst()),
5 LAST("最后一个", new ExecutorRouteLast()),
6 ROUND("轮循", new ExecutorRouteRound()),
7 RANDOM("随机", new ExecutorRouteRandom()),
8 CONSISTENT_HASH("一致性哈希", new ExecutorRouteConsistentHash()),
9 LEAST_FREQUENTLY_USED("最不经常使用", new ExecutorRouteLFU()),
10 LEAST_RECENTLY_USED("最近最久未使用", new ExecutorRouteLRU()),
11 FAILOVER("故障转移", new ExecutorRouteFailover()),
12 BUSYOVER("忙碌转移", new ExecutorRouteBusyover()),
13 SHARDING_BROADCAST("分片广播", null);
14
15 ExecutorRouteStrategyEnum(String title, ExecutorRouter router) {
16 this.title = title;
17 this.router = router;
18 }
19
20 private String title;
21 private ExecutorRouter router;
22
23 public String getTitle() {
24 return title;
25 }
26 public ExecutorRouter getRouter() {
27 return router;
28 }
29 // 数据库中存的是枚举的名称,此处通过名称的对比,找到路由策略对应的枚举信息
30 public static ExecutorRouteStrategyEnum match(String name, ExecutorRouteStrategyEnum defaultItem){
31 if (name != null) {
32 for (ExecutorRouteStrategyEnum item: ExecutorRouteStrategyEnum.values()) {
33 if (item.name().equals(name)) {
34 return item;
35 }
36 }
37 }
38 return defaultItem;
39 }
40
41}
分片广播
通过源码回顾,我们可以清晰的看到,当系统判断当前任务的路由策略是分片广播时, 就会遍历执行器的集群机器列表,
给每一台机器都发送执行消息,分片总数为集群机器数量,分片标记从0开始,上面的代码已经非常清楚了,此处不再赘述。
第一个
由上面对ExecutorRouteStrategyEnum的分析,我们可以看到,该策略对应的是 这个ExecutorRouteFirst执行策略类。 主要看routeRun 这个方法
1//ExecutorRouteFirst.java
2public String route(int jobId, ArrayList<String> addressList) {
3 return addressList.get(0);
4}
5@Override
6public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList) {
7
8 // 直接取集群地址列表里面的第一台机器来进行执行
9 String address = route(triggerParam.getJobId(), addressList);
10
11 // run executor
12 ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
13 // 将执行该任务的执行器地址,放入到结果里面返回,最后会记录到日志里面取
14 runResult.setContent(address);
15 return runResult;
16}
最后一个
直接 从执行机集群列表的list里面取最后一个,源码如下
1//ExecutorRouteLast.java
2public String route(int jobId, ArrayList<String> addressList) {
3 return addressList.get(addressList.size()-1);
4}
5
6@Override
7public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList) {
8 // 通过看上面的route方法,可以看到直接取得是list最后一个数据
9 String address = route(triggerParam.getJobId(), addressList);
10
11 // run executor
12 ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
13 runResult.setContent(address);
14 return runResult;
15}
轮循
主要看ExecutorRouteRound这个类里面的代码
1//ExecutorRouteRound.java
2// 存储每个任务ID,对应的数值
3private static ConcurrentHashMap<Integer, Integer> routeCountEachJob = new ConcurrentHashMap<Integer, Integer>();
4// 缓存过期时间戳
5private static long CACHE_VALID_TIME = 0;
6
7@Override
8public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList) {
9
10 // 通过route方法获取执行器地址
11 String address = route(triggerParam.getJobId(), addressList);
12
13 // run executor
14 ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
15 runResult.setContent(address);
16 return runResult;
17}
18public String route(int jobId, ArrayList<String> addressList) {
19 // 在执行器地址列表,获取相应的地址, 通过count(jobid) 这个方法来实现,主要逻辑在这个方法
20 // 通过count(jobId)拿到数字之后, 通过求于的方式,拿到执行器地址
21 // 例: count=2 , addresslist.size = 3
22 // 2%3 = 2 , 则拿list中下表为2的地址
23 return addressList.get(count(jobId)%addressList.size());
24}
25
26
27private static int count(int jobId) {
28 // 如果当前的时间,大于缓存的时间,那么说明需要刷新了
29 if (System.currentTimeMillis() > CACHE_VALID_TIME) {
30 routeCountEachJob.clear();
31 // 设置缓存时间戳,默认缓存一天,一天之后会从新开始
32 CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
33 }
34
35 // count++
36 Integer count = routeCountEachJob.get(jobId);
37 // 当第一次执行轮循这个策略的时候,routeCountEachJob这个Map里面肯定是没有这个地址的, count==null ,
38 // 当 count==null或者count大于100万的时候,系统会默认在100之间随机一个数字 , 放入hashMap, 然后返回该数字
39 // 当系统第二次进来的时候,count!=null 并且小于100万, 那么把count加1 之后返回出去。
40 count = (count==null || count>1000000)?(new Random().nextInt(100)):++count; // 初始化时主动Random一次,缓解首次压力
41 // 为啥首次需要随机一次,而不是指定第一台呢?
42 // 因为如果默认指定第一台的话,那么所有任务的首次加载全部会到第一台执行器上面去,这样会导致第一台机器刚开始的时候压力很大。
43 routeCountEachJob.put(jobId, count);
44 return count;
45}
总结:这里主要是通过一个ConcurrentHashMap来记录每个任务对应的执行次数,维护一个count值, 通过这个count值 对集群机器大小求于,得到list下标。
最终得到轮循的那台机器。
随机
随机这个策略比较简单,通过在集群列表的大小内随机拿出一台机器来执行,比较简单,此处不再赘述
1//ExecutorRouteRandom.java
2private static Random localRandom = new Random();
3
4public String route(int jobId, ArrayList<String> addressList) {
5 // Collections.shuffle(addressList);
6 return addressList.get(localRandom.nextInt(addressList.size()));
7}
8
9@Override
10public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList) {
11 // address
12 String address = route(triggerParam.getJobId(), addressList);
13
14 // run executor
15 ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
16 runResult.setContent(address);
17 return runResult;
18}
最不经常使用 (LFU)
单个JOB对应的每个执行器,使用频率最低的优先被选举
1//ExecutorRouteLFU.java
2// 定义个静态的MAP, 用来存储任务ID对应的执行信息
3private static ConcurrentHashMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<Integer, HashMap<String, Integer>>();
4// 定义过期时间戳
5private static long CACHE_VALID_TIME = 0;
6
7@Override
8public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList) {
9
10 // address
11 String address = route(triggerParam.getJobId(), addressList);
12
13 // run executor
14 ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
15 runResult.setContent(address);
16 return runResult;
17}
18
19public String route(int jobId, ArrayList<String> addressList) {
20
21 // 如果当前系统时间大于过期时间
22 if (System.currentTimeMillis() > CACHE_VALID_TIME) {
23 jobLfuMap.clear(); //清空
24 //重新设置过期时间,默认为一天
25 CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
26 }
27
28 // 从MAP中获取执行信息
29 //lfuItemMap中放的是执行器地址以及执行次数
30 HashMap<String, Integer> lfuItemMap = jobLfuMap.get(jobId); // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList;
31 if (lfuItemMap == null) {
32 lfuItemMap = new HashMap<String, Integer>();
33 jobLfuMap.put(jobId, lfuItemMap);
34 }
35 for (String address: addressList) {
36 // map中不包含,并且值大于一百万的时候,需要重新初始化执行器地址对应的执行次数
37 // 初始化的规则是在机器地址列表size里面进行随机
38 // 当运行一段时间后,有新机器加入的时候,此时,新机器初始化的执行次数较小,所以一开始,新机器的压力会比较大,后期慢慢趋于平衡
39 if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) {
40 // 初始化时主动Random一次,缓解首次压力 , 如果这个地方都设置从1 开始的话,那么下面的排序Comparator ,每个任务始终都会
41 // 获取相同的服务器地址, 这样会导致某一台服务器首次压力特别大。 使用随机,压力分散
42 lfuItemMap.put(address, new Random().nextInt(addressList.size()));
43 }
44 }
45
46 // 将lfuItemMap中的key.value, 取出来,然后使用Comparator进行排序,value小的靠前。
47 List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<Map.Entry<String, Integer>>(lfuItemMap.entrySet());
48 Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() {
49 @Override
50 public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
51 return o1.getValue().compareTo(o2.getValue());
52 }
53 });
54 //取第一个,也就是最小的一个,将address返回,同时对该address对应的值加1 。
55 Map.Entry<String, Integer> addressItem = lfuItemList.get(0);
56 String minAddress = addressItem.getKey();
57 addressItem.setValue(addressItem.getValue() + 1);
58
59 return addressItem.getKey();
60}
总结: 通过一个HashMap存储每个任务对应的 机器执行次数,然后通过hashMap value排序, 得到执行次数最小的那个,也就是得到了最不经常使用的那台机器
最近最久未使用(LRU)
单个JOB对应的每个执行器,最久为使用的优先被选举 , 此处使用的是linkedHashMap来实现LRU算法的。通过linkedHashMap的每次get/put的时候会进行排序,最新操作的数据会在最后面。 从而取第一个数据就代表是最久没有被使用的
1//ExecutorRouteLRU.java
2// 定义个静态的MAP, 用来存储任务ID对应的执行信息
3private static ConcurrentHashMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<Integer, LinkedHashMap<String, String>>();
4// 定义过期时间戳
5private static long CACHE_VALID_TIME = 0;
6
7public String route(int jobId, ArrayList<String> addressList) {
8
9 // cache clear
10 if (System.currentTimeMillis() > CACHE_VALID_TIME) {
11 jobLRUMap.clear();
12 //重新设置过期时间,默认为一天
13 CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
14 }
15
16 // init lru
17 LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId);
18 if (lruItem == null) {
19 /**
20 * LinkedHashMap
21 * a、accessOrder:ture=访问顺序排序(get/put时排序);false=插入顺序排期;
22 * b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法;
23 */
24 lruItem = new LinkedHashMap<>(16, 0.75f, true);
25 jobLRUMap.put(jobId, lruItem);
26 }
27
28 // 如果地址列表里面有地址不在map中,此处是可以再次放入,防止添加机器的问题
29 for (String address: addressList) {
30 if (!lruItem.containsKey(address)) {
31 lruItem.put(address, address);
32 }
33 }
34
35 // 取头部的一个元素,也就是最久操作过的数据
36 String eldestKey = lruItem.entrySet().iterator().next().getKey();
37 String eldestValue = lruItem.get(eldestKey);
38 return eldestValue;
39}
总结:此处主要是利用了LinkedHashMap的排序特性, get/put时都会对资源重排。 最久没有被操作过的数据,就会排在前面。
一致性Hash
在讲这个策略之前,先说一下一致性Hash算法 ,
先构造一个长度为2^32的整数环(这个环被称为一致性Hash环),根据节点名称的Hash值(其分布为[0, 2^32-1])将服务器节点放置在这个Hash环上,然后根据数据的Key值计算得到其Hash值(其分布也为[0, 2^32-1]),接着在Hash环上顺时针查找距离这个Key值的Hash值最近的服务器节点,完成Key到服务器的映射查找。
例:
为什么需要一致性hash
比如你有 N 个 cache 服务器(后面简称 cache ),那么如何将一个对象 object 映射到 N 个 cache 上呢,你很可能会采用通用的方法计算 object 的 hash 值,然后均匀的映射到到 N 个 cache ;hash(object)%N
一切都运行正常,再考虑如下的两种情况;
1 一个 cache 服务器 m down 掉了(在实际应用中必须要考虑这种情况),这样所有映射到 cache m 的对象都会失效,怎么办,需要把 cache m 从 cache 中移除,这时候 cache 是 N-1 台,映射公式变成了 hash(object)%(N-1) ;
2 由于访问加重,需要添加 cache ,这时候 cache 是 N+1 台,映射公式变成了 hash(object)%(N+1) ;
1 和 2 意味着什么?这意味着突然之间几乎所有的 cache 都失效了。对于服务器而言,这是一场灾难,洪水般的访问都会直接冲向后台服务器;
再来考虑第三个问题,由于硬件能力越来越强,你可能想让后面添加的节点多做点活,显然上面的 hash 算法也做不到。
有什么方法可以改变这个状况呢,这就是 consistent hashing…
详细介绍: http://blog.csdn.net/u010412301/article/details/52441400
源码分析:
这个地方使用的Hash方法是作者自己写的,因为String的hashCode可能重复,需要进一步扩大hashCode的取值范围
1//ExecutorRouteConsistentHash.java
2private static int VIRTUAL_NODE_NUM = 5;
3
4@Override
5public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList) {
6 // address
7 String address = route(triggerParam.getJobId(), addressList);
8
9 // run executor
10 ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
11 runResult.setContent(address);
12 return runResult;
13}
14
15public String route(int jobId, ArrayList<String> addressList) {
16
17 //
18 //
19 TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
20 for (String address: addressList) {
21
22 for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
23 // 通过自定义的Hash方法,得到服务节点的Hash值,同时放入treeMap
24 long addressHash = hash("SHARD-" + address + "-NODE-" + i);
25 addressRing.put(addressHash, address);
26 }
27 }
28 // 得到JobId的Hash值
29 long jobHash = hash(String.valueOf(jobId));
30 // 调用treeMap的tailMap方法,拿到map中键大于jobHash的值列表
31 SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
32 // 如果addressRing中有比jobHash的那么直接取lastRing 的第一个
33 if (!lastRing.isEmpty()) {
34 return lastRing.get(lastRing.firstKey());
35 }
36 // 如果没有,则直接取addresRing的第一个
37 // 反正最终的效果是在Hash环上,顺时针拿离jobHash最近的一个值
38 return addressRing.firstEntry().getValue();
39}
40
41/**
42 * get hash code on 2^32 ring (md5散列的方式计算hash值)
43 * @param key
44 * @return
45 */
46private static long hash(String key) {
47
48 // md5 byte
49 MessageDigest md5;
50 try {
51 md5 = MessageDigest.getInstance("MD5");
52 } catch (NoSuchAlgorithmException e) {
53 throw new RuntimeException("MD5 not supported", e);
54 }
55 md5.reset();
56 byte[] keyBytes = null;
57 try {
58 keyBytes = key.getBytes("UTF-8");
59 } catch (UnsupportedEncodingException e) {
60 throw new RuntimeException("Unknown string :" + key, e);
61 }
62
63 md5.update(keyBytes);
64 byte[] digest = md5.digest();
65
66 // hash code, Truncate to 32-bits
67 long hashCode = ((long) (digest[3] & 0xFF) << 24)
68 | ((long) (digest[2] & 0xFF) << 16)
69 | ((long) (digest[1] & 0xFF) << 8)
70 | (digest[0] & 0xFF);
71
72 long truncateHashCode = hashCode & 0xffffffffL;
73 return truncateHashCode;
74}
题外扩展
问题:
添加节点D
添加节点D,但是节点D在A和B的中间,仅能分担一点点的压力,
所以需要引入虚拟节点的思想,解决一致性hash算法分布不均导致负载不均的问题。一个真实节点对应若干个虚拟节点,当key被映射到虚拟节点上时,则被认为映射到虚拟节点所对应的真实节点上。
例:
节点A: A1 , A2 , A3 , A4
节点B: B1 , B2 , B3 , B4
节点C: C1, C2, C3, C4
虚拟节点分散,假如A服务器宕机了,那么A1,A2,A3,A4,虚拟节点失效,新来的请求会分散到服务器B,和服务器C一起分摊,解决了由于节点失效造成的负载不均的问题。
故障转移
1//ExecutorRouteFailover.java
2public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList) {
3
4 StringBuffer beatResultSB = new StringBuffer();
5 //循环集群地址
6 for (String address : addressList) {
7 // beat
8 ReturnT<String> beatResult = null;
9 try {
10 // 向执行器发送 执行beat信息 , 试探该机器是否可以正常工作
11 ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
12 beatResult = executorBiz.beat();
13 } catch (Exception e) {
14 logger.error(e.getMessage(), e);
15 beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
16 }
17 // 拼接日志 , 收集日志信息,后期一起返回
18 beatResultSB.append( (beatResultSB.length()>0)?"<br><br>":"")
19 .append(I18nUtil.getString("jobconf_beat") + ":")
20 .append("<br>address:").append(address)
21 .append("<br>code:").append(beatResult.getCode())
22 .append("<br>msg:").append(beatResult.getMsg());
23
24 // 返回状态为成功
25 if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
26 // 执行任务
27 ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
28 beatResultSB.append("<br><br>").append(runResult.getMsg());
29
30 // result
31 runResult.setMsg(beatResultSB.toString());
32 runResult.setContent(address);
33 return runResult;
34 }
35 }
36 return new ReturnT<String>(ReturnT.FAIL_CODE, beatResultSB.toString());
37
38}
忙碌转移
这个策略更上面那个故障转移的原理一致,只不过不同的是,故障转移是判断机器是否存活, 二忙碌转移是想执行器发送消息判断该任务对应的线程是否处于执行状态。
1//ExecutorRouteBusyover.java
2@Override
3public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList) {
4
5 StringBuffer idleBeatResultSB = new StringBuffer();
6 // 循环集群地址
7 for (String address : addressList) {
8 // beat
9 ReturnT<String> idleBeatResult = null;
10 try {
11 // 向执行服务器发送消息,判断当前jobId对应的线程是否忙碌,接下来可以看一下idleBeat这个方法
12 ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
13 idleBeatResult = executorBiz.idleBeat(triggerParam.getJobId());
14 } catch (Exception e) {
15 logger.error(e.getMessage(), e);
16 idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
17 }
18 idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"<br><br>":"")
19 .append(I18nUtil.getString("jobconf_idleBeat") + ":")
20 .append("<br>address:").append(address)
21 .append("<br>code:").append(idleBeatResult.getCode())
22 .append("<br>msg:").append(idleBeatResult.getMsg());
23
24 // 返回成功,代表这台执行服务器对应的线程处于空闲状态
25 if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
26 // 执行人呢无
27 ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
28 idleBeatResultSB.append("<br><br>").append(runResult.getMsg());
29
30 // result
31 runResult.setMsg(idleBeatResultSB.toString());
32 runResult.setContent(address);
33 return runResult;
34 }
35 }
36
37 return new ReturnT<String>(ReturnT.FAIL_CODE, idleBeatResultSB.toString());
38}
看一下执行器那边的idleBeat代码实现
1//ExecutorBizImpl
2@Override
3public ReturnT<String> idleBeat(int jobId) {
4
5 // isRunningOrHasQueue
6 boolean isRunningOrHasQueue = false;
7 // 从线程池里面获取当前任务对应的线程
8 JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);
9 if (jobThread != null && jobThread.isRunningOrHasQueue()) {
10 // 线程处于运行中
11 isRunningOrHasQueue = true;
12 }
13
14 if (isRunningOrHasQueue) {
15 // 线程运行中,则返回fasle
16 return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread is running or has trigger queue.");
17 }
18 // 线程空闲,返回success
19 return ReturnT.SUCCESS;
20}
以上是关于k8s-pod之Job的主要内容,如果未能解决你的问题,请参考以下文章